Hot questions for Handling MissingBackpressureException in RxJava

Top 10 Java Open Source / RxJava / Handling MissingBackpressureException

Caused by: rx.exceptions.MissingBackpressureException

Question: I have an other problem. This time I am facing this error Caused by: rx.exceptions.MissingBackpressureException during the execution of this code:

class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>

init {
    numberOfFileToUpdate = PublishSubject.create()
}

public fun startUpdate(): Observable<Int>{
    return getProducts().flatMap { products: ArrayList<Product> ->
            numberOfFileToUpdate.onNext(products.size)
            return@flatMap saveRows(products)
        }
}

private fun getProducts(): Observable<ArrayList<Product>> {
    return Observable.create {
        var products: ArrayList<Product> = ArrayList()
        var i = 0
        while (i++ < 100) {
            products.add(Product())
        }

        it.onNext(products)
        it.onCompleted()
    }
}


private fun saveRows(products: ArrayList<Product>): Observable<Int> {
    return Observable.create<Int> {
        var totalNumberOfRow = products.size

        while (totalNumberOfRow-- > 0){
            it.onNext(products.size - totalNumberOfRow)
            Thread.sleep(100)
        }
        it.onCompleted()
    }
}

The code is just a test code of two process. The first process gets a list of Product from the web, then those products are persist into the a local database within the app. This is the main idea.

The method getProducts do the work of getting the data, in this case I just create an ArrayList of 100 products. The saveRows do the persist work.

The saveRows methods emit juts an Int that represent the saved row. I am doing this because in the UI I have a progressbar reporting the progress.

From an other point of the app I call the method startUpdate and after a few items emitted I get the describe exception

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)

I understand why this exception should be happening https://github.com/ReactiveX/RxJava/wiki/Backpressure but I do not know what I am doing wrong or how to solve it.

Answer: The problem is your Observable source emits faster than the consumer consumes. It takes 100 ms to save each product. You can add onBackpressureBuffer().

UpdateHelper().startUpdate()
    .onBackpressureBuffer() // Add this
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
      Log.d(TAG, "next $it")
    }, {
      Log.d(TAG, it.message)
    }, {
    })

Also, you can try removing Thread.sleep(100).

flatmap use OperatorMerge ( merge(map(func))): you can see that in your case, map's onNexts are sent faster than have been requested.