Hot questions for Handling Exception in RxJava

Top 10 Java Open Source / RxJava / Handling Exception

retrofit with rxjava handling network exceptions globally

Question: I am trying to handle exceptions in app on global level, so that retrofit throws an error i catch it in some specific class with logic for handling those errors

I have an interface

@POST("/token")
AuthToken refreshToken(@Field("grant_type") String grantType, @Field("refresh_token") String refreshToken);

and observables

public Observable<AuthToken> refreshToken(String refreshToken) {
    return Observable.create((Subscriber<? super AuthToken> subscriber) -> {
        try {
            subscriber.onNext(apiManager.refreshToken(REFRESH_TOKEN, refreshToken));
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

When i get 401 from server (invalid token or some other network related error) i want to refresh the token and repeat the rest call. Is there a way to do this with rxjava for all rest calls with some kind of observable that will catch this error globally, handle it and repeat the call that throw-ed it?

For now i am using subject to catch the error on .subscribe() like this

private static BehaviorSubject errorEvent = BehaviorSubject.create();

public static BehaviorSubject<RetrofitError> getErrorEvent() {
    return errorEvent;
}

and in some call

getCurrentUser = userApi.getCurrentUser().observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    (user) -> {
                        this.user = user;
                    },
                    errorEvent::onNext
            );

then in my main activity i subscribe to that behaviour subject and parse the error

SomeApi.getErrorEvent().subscribe(
            (e) -> {
                //parse the error
            }
    );

but i cant repeat the call for the observable that throw the error.

Answer: You need to use the operator onErrorResumeNext(Func1 resumeFunction), better explained in the official wiki:

The onErrorResumeNext( ) method returns an Observable that mirrors the behavior of the source Observable, unless that Observable invokes onError( ) in which case, rather than propagating that error to the Subscriber, onErrorResumeNext( ) will instead begin mirroring a second, backup Observable

In your case I would put something like this:

getCurrentUser = userApi.getCurrentUser()
.onErrorResumeNext(refreshTokenAndRetry(userApi.getCurrentUser()))
.observeOn(AndroidSchedulers.mainThread())
            .subscribe(...)

where:

private <T> Func1<Throwable,? extends Observable<? extends T>> refreshTokenAndRetry(final Observable<T> toBeResumed) {
        return new Func1<Throwable, Observable<? extends T>>() {
            @Override
            public Observable<? extends T> call(Throwable throwable) {
                // Here check if the error thrown really is a 401
                if (isHttp401Error(throwable)) {
                    return refreshToken().flatMap(new Func1<AuthToken, Observable<? extends T>>() {
                        @Override
                        public Observable<? extends T> call(AuthToken token) {
                            return toBeResumed;
                        }
                    });
                }
                // re-throw this error because it's not recoverable from here
                return Observable.error(throwable);
            }
        };
    }

Note also that this function can be easily used in other cases, because it's not typed with the actual values emitted by the resumed Observable.


How to handle exceptions thrown by observer's onNext in RxJava?

Question: Consider the following example:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

This outputs numbers from 1 to 5 and then prints the exception.

What I want to achieve is make the observer stay subscribed and continue to run after throwing an exception, i.e. print all numbers from 1 to 10.

I have tried using retry() and other various error handling operators, but, as said in the documentation, their purpose is handling errors emitted by the observable itself.

The most straightforward solution is just to wrap the whole body of onNext into a try-catch block, but that doesn't sound like a good solution to me. In the similar Rx.NET question, the proposed solution was to make an extension method which would do the wrapping by creating a proxy observable. I tried to remake it:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

This does not change anything, because RxJava itself wraps the subscriber into a SafeSubscriber. Using unsafeSubscribe to get around it doesn't seem to be a good solution either.

What can I do to solve this problem?

Answer: This is a common question that arises when learning Rx.

Your suggestion to put your exception handling logic in the subscriber is preferable over creating a generic observable wrapper.

Remember, that Rx is about pushing events to subscribers.

From the observable interface, it's clear there's not really anything an observable can know about it's subscribers other than how long they took to handle an event, or the information contained in any thrown exceptions.

A generic wrapper to handle subscriber exceptions and carry on sending events to that subscriber is a bad idea.

Why? Well the observable should only really know that the subscriber is now in an unknown failure state. To carry on sending events in this situation is unwise - perhaps, for example, the subscriber is in a condition where every event from this point forward is going to throw an exception and take a while to do it.

Once a subscriber has thrown an exception, there are only two viable courses of action for the observable:

  1. Re-throw the exception
  2. Implement generic handling to log the failure and stop sending it events (of any kind) and clean up any resources due to that subscriber and carry on with any remaining subscriptions.

Specific handling of subscriber exceptions would be a poor design choice; it would create inappropriate behavioural coupling between subscriber and observable. So if you want to be resilient to bad subscribers the two choices above are really the sensible limit of responsibility of the observable itself.

If you want your subscriber to be resilient and carry on, then you should absolutely wrap it in exception handling logic designed to handle the specific exceptions you know how to recover from (and perhaps to handle transient exceptions, logging, retry logic, circuit breaking etc.).

Only the subscriber itself will have the context to understand whether it is fit to receive further events in the face of failure.

If your situation warrants developing reusable error handling logic, put yourself in the mindset of wrapping the observer's event handlers rather than the observable - and do take care not to blindly carry on transmitting events in the face of failure. Release It! whilst not written about Rx, is an entertaining software engineering classic has plenty to say on this last point. If you've not read it, I highly advise it.


Handling exceptions inside Observable.fromCallable() when subscription gets cleared

Question: I have a situation where a long running process is wrapped in an Observable.fromCallable(). This process is an OkHttp call and, if terminated, will throw an IOException. If the observable is subscribed to, then the disposable is stored in a CompositeDisposable and the exception is handled as expected. However, my code will clear the CompositeDisposable in some cases, triggering the OkHttp thread termination with no error handling, causing the app to crash with an unhandled exception. Here's a simple unit test example of this problem:

@Test
public void test(){
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
            Thread.sleep(3000);
            return null;
        }
    });
    compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    compositeDisposable.clear();
}

Is there any way to work around this problem?

Answer: Unlike RxJava1, RxJava2 will not deliver this Exception to the Subscriber onError(), as you called cancel() to unsubscribe and don't wan't to get notifications anymore, so this kind of Exceptions which happens with the unsubscription code go by default now to Thread.currentThread().getUncaughtExceptionHandler().uncaughtException().

You can either wrap with try catch this kind of exceptions that may happens with cancel, or override the default behavior with:

RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer()); 

or any other handling you would like.


How to use Observable.fromCallable() with a checked Exception?

Question: Observable.fromCallable() is great for converting a single function into an Observable. But how do you handle checked exceptions that might be thrown by the function?

Most of the examples I've seen use lambdas and "just work". But how would you do this without lambdas? For example

Observable.fromCallable(() -> downloadFileFromNetwork());

It's a one-liner now! It deals with checked exceptions, no more weird Observable.just() and Observable.error() for such easy thing as deferring code execution!

When I attempt to implement the above Observable without a lambda expression, based on other examples I've seen, and how Android Studio auto-completes, I get the following:

Observable.fromCallable(new Func0<File>() {
    @Override
    public File call() {
        return downloadFileFromNetwork();
    }
}

But if downloadFileFromNetwork() throws a checked exception, I have to try-catch it and wrap it in a RuntimeException. There's got to be a better way! How does the above lambda support this?

Answer: Rather than using a Func0 with Observable.fromCallable(), use Callable. For example:

Observable.fromCallable(new Callable<File>() {
    @Override
    public File call() throws Exception {
        return downloadFileFromNetwork();
    }
}

Since Callable's method call() throws Exception, you don't have to wrap your function in a try-catch! This must be what the lambda is using under the hood.


Rxjava 2 exception with camera

Question: I just switched my code from asynctask to rxjava2 and I'm randomly getting this exception on my nexus:

Camera is being used after Camera.release() was called in Galaxy s6 Edge

Following is my code-

Class Camera:

 public class Cameras {

    private Camera camera;
    private boolean cameraReleased = false;
...
   private void releaseCamera() {
        synchronized (this) {

            if (null == camera) return;
            camera.setPreviewCallback(null);

            camera.release();
            camera = null;
            cameraReleased = true;
        }
    }


    public void release() {

        synchronized (this) {
            if (null == camera) return;
            stopPreview(NEXT_TASK_RELEASE_COMPLETE);
        }
    }


    private void releaseComplete() {
        synchronized (this) {
            if (camera != null) {

                camera.release();
                cameraReleased = true;
                camera = null;
            }
        }
        buffer1 = null;
        buffer2 = null;
        buffer3 = null;
    }
}

cameras.java:

if (Looper.getMainLooper().getThread() == Thread.currentThread()) {
            Observable.defer(new Callable<ObservableSource<?>>() {
                @Override
                public ObservableSource<?> call() throws Exception {
                    if (LogDog.isEnabled) LogDog.e("Debug::"+TAG + "::stopPreview()::AsyncTask::doInBackground()", " (camera != null) =" + (camera != null) );
                    synchronized (this) {
                        if ( (null != camera) && (!cameraReleased) ) {
                            if (LogDog.isEnabled)  LogDog.e("Debug::" + TAG + "::stopPreview()::AsyncTask::doInBackground()", " XXX CALL camera.stopPreview()");
                            camera.stopPreview();
                        }
                    }
                    return Completable.complete().toObservable();
                }
            }).doOnComplete(() -> {
                routNextTask(nextTask);
                Log.d("Complete", "Complete");
            })
                    .subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread()).subscribe();

Not sure what I am doing wrong. Any ideas where I can release the camera or allocate it, so it works without any issues? Exception is as follows:

FATAL EXCEPTION: main io.reactivex.exceptions.OnErrorNotImplementedException: Camera is being used after Camera.release() was called 
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) 
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) 
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74) 
...

Answer: The code you wrote for releasing the camera is prone to have race-conditions. A small change which could already make a difference is setting the flag before the action happens.

cameraReleased = true;
camera.release();
camera = null;

It might be necessary to recheck the flag just before the release of the Camera, but for this you synchronized the code before. In this context the issue is, that you're calling synchronized(this) within the deferred Observable. Instead you should synchronize on the same instance like everywhere else, because this doesn't refer to the outer class. Instead use this@Cameras

@Override
public ObservableSource<?> call() throws Exception {
    if (LogDog.isEnabled) LogDog.e("Debug::"+TAG + "::stopPreview()::AsyncTask::doInBackground()", " (camera != null) =" + (camera != null) );
    synchronized (this@Cameras) {
        if ( (null != camera) && (!cameraReleased) ) {
            if (LogDog.isEnabled)  LogDog.e("Debug::" + TAG + "::stopPreview()::AsyncTask::doInBackground()", " XXX CALL camera.stopPreview()");
            camera.stopPreview();
        }
    }
    return Completable.complete().toObservable();
}

Additional to this, you're use-case for Observable.defer() doesn't look right. The Completable.fromAction() factory might be more suitable.


Exception handling in rxjava

Question: I am trying to get accustomed to rxjava and I am trying to call the below QuoteReader in an Observable. I am not sure how to handle the exception thrown,

public class QuoteReader {
   public Map<String, Object> getQuote() throws IOException{
       OkHttpClient okHttpClient = new OkHttpClient();
       Request request = new Request.Builder().url("http://quotes.rest/qod.json").build();
       Gson gson = new Gson();
       Map<String, Object> responseMap = null;

       try(Response response = okHttpClient.newCall(request).execute()) {
         responseMap = gson.fromJson(response.body().string(), Map.class);
         System.out.println("response map : "+responseMap);
       } catch(IOException ioe) {
         ioe.printStackTrace();
         throw ioe;
       } finally {
         okHttpClient = null;
         request = null;
       }
    return responseMap;
  }
}

The following is the rx code I am trying to write,

rx.Observable.just(new QuoteReader().getQuote()) //compile time error saying unhandled exception
              .subscribe(System.out::println);

How should I update the code to handle the exception.

Answer: Use fromCallable that allows your method to throw (plus, it gets evaluated lazily and not before you even get into the Observable world):

rx.Observable.fromCallable(() -> new QuoteReader().getQuote())
          .subscribe(System.out::println, Throwable::printStackTrace);

How to subscribe to click events so exceptions don't unsubscribe?

Question: I'm using RxJava for Android (RxAndroid) and I subscribe to click events of a view, and do something on them as follows:

subscription = ViewObservable.clicks(view, false)
    .map(...)
    .subscribe(subscriberA);

The problem is whenever there is an exception, subscriberA automatically unsubscribes, leading to the next click not triggering anything.

How to handle exceptions so to intercept all the click events regardless of exceptions being thrown?

Answer: Use retry method:

subscription = ViewObservable.clicks(view, false)
    .map(...)
    .retry()
    .subscribe(subscriberA)

However, you will not receive any exception in onError. To handle exceptions with retry (resubscribe) logic use retryWhen:

subscription = ViewObservable.clicks(view, false)
    .map(...)
    .retryWhen(new Func1<Observable<? extends Notification<?>>, Observable<?>>() {

        @Override
        public Observable<?> call(Notification errorNotification) {
            Throwable throwable = errorNotification.getThrowable();
            if (errorNotification.isOnError() && handleError(throwable)) {
                // return the same observable to resubscribe
                return Observable.just(errorNotification);
            }
            // return unhandled error to handle it in onError and unsubscribe
            return Observable.error(throwable);
        }

        private boolean handleError(Throwable throwable) {
            // handle your errors
            // return true if error handled to retry, false otherwise
            return true;
        }
    }
    .subscribe(subscriberA)