Hot questions for Handling IllegalStateException in RxJava

Top 10 Java Open Source / RxJava / Handling IllegalStateException

RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!"

Question: I'm using RxJava to calculate the normalized auto correlation over some sensor data in Android. Strangely enough, my code throws an exception ("java.lang.IllegalStateException: Only one subscriber allowed!") and I'm unsure what to make of it: I know that GroupedObservables might throw this exception when subscribed upon my multiple subscribers, but I don't think I'm using such a thing anywhere.

Below you find the method that (most likely) triggers the exception:

public Observable<Float> normalizedAutoCorrelation(Observable<Float> observable, final int lag) {
    Observable<Float> laggedObservable = observable.skip(lag);

    Observable<Float> meanObservable = mean(observable, lag);
    Observable<Float> laggedMeanObservable = mean(laggedObservable, lag);

    Observable<Float> standardDeviationObservable = standardDeviation(observable, meanObservable, lag);
    Observable<Float> laggedStandardDeviationObservable = standardDeviation(laggedObservable, laggedMeanObservable, lag);

    Observable<Float> deviation = observable.zipWith(meanObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float mean) {
            return value - mean;
        }
    });

    Observable<Float> laggedDeviation = observable.zipWith(laggedMeanObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float mean) {
            return value - mean;
        }
    });

    Observable<Float> autoCorrelationPartObservable = deviation.zipWith(laggedDeviation, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float laggedValue) {
            return value * laggedValue;
        }
    });

    Observable<Float> autoCorrelationObservable = flatten(autoCorrelationPartObservable.window(lag, 1).scan(new Func2<Observable<Float>, Observable<Float>, Observable<Float>>() {
        @Override
        public Observable<Float> call(Observable<Float> memoObservable, Observable<Float> observable) {
            if(memoObservable == null) return observable;

            return memoObservable.zipWith(observable, new Func2<Float, Float, Float>() {
                @Override
                public Float call(Float memo, Float value) {
                    return memo + value;
                }
            });
        }
    }));

    Observable<Float> normalizationObservable = standardDeviationObservable.zipWith(laggedStandardDeviationObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float standardDeviation, Float laggedStandardDeviation) {
            return lag * standardDeviation * laggedStandardDeviation;
        }
    });

    return autoCorrelationObservable.zipWith(normalizationObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float autoCorrelation, Float normalization) {
            return autoCorrelation / normalization;
        }
    });
}

And this is the stacktrace I get:

java.lang.IllegalStateException: Only one subscriber allowed!
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124)
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
at rx.Observable$1.call(Observable.java:145)
...

I don't think I'm doing anything strange here: some zips, reduces, scans and flatmaps.

Am I missing something completely obvious, is there some hidden rule that I'm breaking here or is it a bug in RxJava?

Answer: In RxJava, the operators groupBy and window return an observable which can be subscribed to only once and if subscribed, they replay their accumulated contents to the sole subscriber and switch to 'hot' mode.

This was a tradeoff between returning a fully hot observable and risk missing values or return an unbounded replaying observable that allows any subscribers but retains the accumulated contents indefinitely.

The middle ground, namely a single subscriber, cold-then-hot observable is thought to be the least surprising behavior and gives the developer the option to apply further operators and pick any point between the two extremes:

source.window(1, TimeUnit.SECONDS)
    .map(w -> w.publish())
    .doOnNext(w -> w.connect())
    .subscribe(...)

source.window(1, TimeUnit.SECONDS)
    .map(w -> w.cache())
    .subscribe(...)