Hot questions for Handling MissingBackpressureException in RxJava

Top 10 Java Open Source / RxJava / Handling MissingBackpressureException

Caused by: rx.exceptions.MissingBackpressureException

Question: I have an other problem. This time I am facing this error Caused by: rx.exceptions.MissingBackpressureException during the execution of this code:

class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>

init {
    numberOfFileToUpdate = PublishSubject.create()

public fun startUpdate(): Observable<Int>{
    return getProducts().flatMap { products: ArrayList<Product> ->
            return@flatMap saveRows(products)

private fun getProducts(): Observable<ArrayList<Product>> {
    return Observable.create {
        var products: ArrayList<Product> = ArrayList()
        var i = 0
        while (i++ < 100) {


private fun saveRows(products: ArrayList<Product>): Observable<Int> {
    return Observable.create<Int> {
        var totalNumberOfRow = products.size

        while (totalNumberOfRow-- > 0){
            it.onNext(products.size - totalNumberOfRow)

The code is just a test code of two process. The first process gets a list of Product from the web, then those products are persist into the a local database within the app. This is the main idea.

The method getProducts do the work of getting the data, in this case I just create an ArrayList of 100 products. The saveRows do the persist work.

The saveRows methods emit juts an Int that represent the saved row. I am doing this because in the UI I have a progressbar reporting the progress.

From an other point of the app I call the method startUpdate and after a few items emitted I get the describe exception

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$

I understand why this exception should be happening but I do not know what I am doing wrong or how to solve it.

Answer: The problem is your Observable source emits faster than the consumer consumes. It takes 100 ms to save each product. You can add onBackpressureBuffer().

    .onBackpressureBuffer() // Add this
      Log.d(TAG, "next $it")
    }, {
      Log.d(TAG, it.message)
    }, {

Also, you can try removing Thread.sleep(100).

flatmap use OperatorMerge ( merge(map(func))): you can see that in your case, map's onNexts are sent faster than have been requested.