`combineLatest`, `switchMap` and retaining inner subscriptions

combinelatest pipe
combinelatest not firing
combinelatest vs forkjoin
rxjs combineall
combinelatest unsubscribe
combinelatest duplicate
combinelatest limit
combine latest deprecated

I have a Observable<Array<Observable<T>>> which I want to map to Observable<Array<T>>.

When a new array is emitted, the inner observables should unsubscribe/subscribe as follows:

  • If Observable exists in previous array and the new/current array, retain pre-existing subscription
  • If Observable did not exist in previous array but does exist in new/current array, create new subscription
  • If Observable existed in previous array but does not exist in new/current array, unsubscribe from pre-existing subscription

I hoped to achieve this using switchMap on the outer observable and then passing Array<Observable<T>> into combineLatest. However, switchMap will unsubscribe from its previous inner Observable before subscribing to the new inner Observable, which means inner subscriptions are not retained as desired.

Example (https://stackblitz.com/edit/typescript-b4wgr1). Given code:

import 'rxjs/Rx';
import { Observable } from 'rxjs';

const debugObservable = <T>(t$: Observable<T>, name: string) =>
    new Observable<T>(observer => {
        console.log(name, 'subscribe');
        const subscription = t$.subscribe(observer);
        return () => {
            console.log(name, 'unsubscribe');
            return subscription.unsubscribe();
        };
    });

const ofSingle = <T>(t: T) =>
    new Observable<T>(observer => {
        observer.next(t);
    });

const observableOfArrayOfObservablesOfNumber = new Observable<
    Array<Observable<number>>
>(observe => {
    const keep = debugObservable(ofSingle(1), 'keep');
    const remove = debugObservable(ofSingle(2), 'remove');
    const add = debugObservable(ofSingle(3), 'add');

    observe.next([keep, remove]);

    setTimeout(() => {
        observe.next([keep, add]);
    }, 2000);

    return () => {};
});

// The `switchMap` will unsubscribe to the previous inner observable *before* subscribing to the new
// inner observable.
const final$ = observableOfArrayOfObservablesOfNumber.switchMap(
    arrayOfObservablesOfNumber => {
        const observableOfArrayOfNumbers = Observable.combineLatest(
            arrayOfObservablesOfNumber,
        );
        return debugObservable(
            observableOfArrayOfNumbers,
            'observableOfArrayOfNumbers',
        );
    },
);

final$.subscribe(x => console.log('final', x));

This produces:

observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
keep unsubscribe <--- bad!
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
keep subscribe <--- bad!
add subscribe
final [1, 3]

However, this is what I desire:

observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
add subscribe
final [1, 3]

`combineLatest`, `switchMap` and retaining inner subscriptions, combineLatest`, `switchMap` and retaining inner subscriptions - rxjs. in previous array and the new/current array, retain pre-existing subscription If Observable  💡 flatMap is an alias for mergeMap! 💡 If only one inner subscription should be active at a time, try switchMap! 💡 If the order of emission and subscription of inner observables is important, try concatMap! This operator is best used when you wish to flatten an inner observable but want to manually control the number of inner subscriptions.

I came up with a better solution, to use combineLatestHigherOrder from rxjs-etc: https://github.com/cartant/rxjs-etc

https://stackblitz.com/edit/typescript-hfze6m?file=index.ts

`refCount` with delay · Issue #4033 · ReactiveX/rxjs · GitHub, However, switchMap will unsubscribe from its previous inner /combinelatest-​switchmap-and-retaining-inner-subscriptions/51901191. When the source emits, we need to unsubscribe from the previous inner subscription. Because this is a common pattern in Rx, there is a shortcut to achieve the same behaviour — switchMap(). So switchMap() is just map() + switch(). Summary. We learned about higher order observables and the difference between mergeMap() and switchMap().

combineLatest, combineAll can be used to apply combineLatest to emitted observables when a your inner observables is likely not functioning as intended, or a subscription  💡 combineAll can be used to apply combineLatest to emitted observables when a source completes! This operator is best used when you have multiple, long-lived observables that rely on each other for some calculation or determination. Basic examples of this can be seen in example three, where events from multiple buttons are being combined to

Observable, Creates an Observable that, on subscribe, calls an Observable factory to make an combineLatest(other: ObservableInput, project: function): Observable Observable into a first-order Observable by concatenating the inner Observables in order. switchMap(project: function(value: T, ?index: number): ObservableInput,  RxJS comes with the special operators that convert higher-order observables into first-order observables, that we can subscribe to only ones, and receive the event from the inner stream (not the subscription of the inner stream).

Combinelatest switchmap, switchMap — Like mergeMap but when the source Observable emits cancel any previous subscriptions of the inner Observable. What is reactive programming? the inner observable is then subscribed to, and its output is then emitted also by the result observable; when the source observable completes, the result observable also completes; So as we can see the switchMap operator is a great way of doing one HTTP request using the output of an initial request here, so this is one common way that we can use it.

CombineLatest, combineAll can be used to apply combineLatest to emitted observables when a your inner observables is likely not functioning as intended, or a subscription  //each time this is called, it sets up a new subscription to keep all the stream$ as one susbscription, use observable.share() at the end (warning: takes resources, so don't overuse) pipe async

Comments
  • You could do this with custom operator - something that works like combineLatest, but swaps the observables and emitted values like you've described. Are you ok using custom operator or do you insist on combination of existing ones?
  • Custom is fine!
  • This looks very promising, although I'm a bit stuck on implementing it as an RxJS operator…