How to unsubscribe from nested observable

nested subscribe angular 6
unsubscribe nested observables
nested subscription unsubscribe
switchmap
rxjs nested subscribe
mergemap
rxjs nested observables
rxjs flatmap

Using the ngOnDestroy() method on our Angular component we cancel http requests if they are still pending when leaving a page. On some pages we use a custom generic cache helper to prevent reloading data which are already loaded.

import { HttpClient } from "@angular/common/http";
import { Injectable } from "@angular/core";
import { AsyncSubject } from "rxjs";

@Injectable()
export class HttpCacheHelper {
    public cache = new Map<string, AsyncSubject<any>>();

    public constructor(private readonly http: HttpClient) {        

    }

    public get<T>(url: string): AsyncSubject<T> {
        if (!this.cache.has(url)) {
            const subject = new AsyncSubject<T>();
            this.cache.set(url, subject);
            this.http.get(url)
                .subscribe((data:any) => {
                    subject.next(data as T);
                    subject.complete();
                });
        }
        return this.cache.get(url);
    }
}

If I unsubscribe from the AsyncSubject my http call is (of course) not cancelled. How to accomplish that?

I this case u dont have to unsubscribe from the http source ( see this question). Genarally you could use the take operator or flatmap for nested subscribtions.

How to unsubscribe from nested observable, I this case u dont have to unsubscribe from the http source ( see this question). Genarally you could use the take operator or flatmap for nested subscribtions. Get the subscription Object and unsubscribe from it. The subscribed consumer then receives notifications until the function completes, or until they unsubscribe. An observable can deliver multiple values of different types like literals, messages, or events, depending on the context. The API for receiving values is the same whether the values are delivered synchronously or asynchronously.

Get the subscription Object and unsubscribe from it.

import { HttpClient } from "@angular/common/http";
import { Injectable } from "@angular/core";
import { AsyncSubject, Subscription } from "rxjs";

@Injectable()
export class HttpCacheHelper {
    public cache = new Map<string, AsyncSubject<any>>();
    private mySubscription: Subscription;
    public constructor(private readonly http: HttpClient) {        

    }

    public get<T>(url: string): AsyncSubject<T> {
        if (!this.cache.has(url)) {
            const subject = new AsyncSubject<T>();
            this.cache.set(url, subject);
    this.mySubscription = this.http.get(url)
                .subscribe((data:any) => {
                    subject.next(data as T);
                    subject.complete();
                });
        }
        return this.cache.get(url);
    }

// Use below function when required
ngOnDestroy(): void {
    this.mySubscription.unsubscribe();
  }

}

Thinking in nested streams with RxJS, When applied to one observable, it captures all the events emitted by nested observables and moves them up into the first observable's stream. postsVoid$$ is an Observable of an Observable. And the inner Observable will reveal its events only if we subscribe to the outer Observable as well as the inner Observable. As long as we cannot get rid of nested Observables, we also will not get rid of nested subscriptions, when we want to get access to the inner Observable’s events.

You'll need to unsubscribe from the source:

public get<T>(url: string): AsyncSubject<T> {
    if (!this.cache.has(url)) {
        const subject = new AsyncSubject<T>();

        const subscription = this.http.get(url)
            .subscribe((data:any) => {
                subject.next(data as T);
                subject.complete();
            });

         this.cache.set(url, {
           subscribe(observer) { subject.subscribe(observer) },
           unsubscribe(observer) { subscription.unsubscribe() }
         });
    }

    return this.cache.get(url);
}

RxJS in Angular - Antipattern 1 - Nested subscriptions, Nested subscriptions in use; The Bad; The Fix; Bonus; Conclusion have, the more you have to manage unsubscribing, cleanup actions and handling errors. As you probably know when you subscribe to an observable or event in JavaScript, you usually need to unsubscribe at a certain point to release memory in the system. Otherwise, you will have a memory leak. Let’s see the most common cases that you will need to unsubscribe inside the ngOnDestroy lifecycle hook. Forms —

You could try unsubscribing this way as well

Create a subject we want to use to unsubscribe

private onDestroy$ = new Subject();

Add this before any .subscribe() that you need to unsubscribe from

.pipe(takeUntil(this.onDestroy$))

example

this.anyService.getData()
  .pipe(takeUntil(this.onDestroy$))
  .subscribe((data: any) => {
    // Do work ...
  });

Then use ngOnDestroy() like this

ngOnDestroy() {
  this.onDestroy$.next();
  this.onDestroy$.complete(); // Also unsubscribe from the subject itself
}

When we ngOnDestroy() runs and we complete the subject any subscriptions using the takeUntill will also automatically unsubscribe

mergeMap, concatMap, switchMap and when to use them, see from codebase to codebase it is the nested subscribe anti pattern. cancel any existing subscriptions when a new observable is emitted. One of the common questions I get when I teach Angular is: Should I unsubscribe from all these observables? If so, when and where? This post is going to answer those questions. When to unsubscribe? There is a lot of debate online about how to unsubscribe, but there is definitely a consensus as to when we should unsubscribe.

I would just go with a publishReplay and refCount:

@Injectable()
export class HttpCacheHelper {
  public cache = new Map<string, Observable<yourType>>();

  public constructor(private readonly http: HttpClient) {}

  public get<T>(url: string): Observable<T> {
    if (!this.cache.has(url)) {
      const obs = this.http.get(url).pipe(
         publishReplay(1),
         refCount()
      )

      this.cache.set(url, obs); 
    }

    return this.cache.get(url);
  }

  // Use below function when required
  ngOnDestroy(): void {
    this.mySubscription.unsubscribe();
  }
}

Handle nested observable subscribes : Angular2, Handle nested observable subscribes template variables, run markForCheck() and unsubscribe in ngOnDestroy(), it makes everything so much easier? 25. Teams. Q&A for Work. Stack Overflow for Teams is a private, secure spot for you and your coworkers to find and share information.

RxJs Mapping: switchMap vs mergeMap vs concatMap vs exhaustMap, The RxJs Map Operator; What is higher-order Observable Mapping the inner Observable, as it was created in an inner nested code block. are then going to unsubscribe from the previous Observable, before subscribing to� How to unsubscribe from nested observable. 1. Do I need to unsubscribe from an Observable if the Observable is finished with? 0. Unsubscribing to Rxjs Observables.

Angular Observables: What is the best way to prevent a mess of , Angular Observables: What is the best way to prevent a mess of nested observable subscriptions? 4 Answers. Tom L'esperance, Full Stack Developer / Founder� This enables observers to unsubscribe (that is, to stop receiving notifications) before the provider has finished sending them and called the subscriber's OnCompleted method. At any given time, a particular instance of an IObservable<T> implementation is responsible for handling all subscriptions and notifying all subscribers.

Handle multiple API requests in Angular using mergeMap and , We do this in a nested subscription so we can use the results from the first API and execute them in parallel, then return only one observable. An Observable never emits a value in a test if we don’t subscribe to it. To solve this problem, developers often tend to subscribe to our Observable to kick it off. This is what we call the

Comments
  • Can't you just use the share operator: http.get<any>().pipe(share())?
  • As far as I understand it unsubscribes on completion of the request. In my case I want to unsubscribe on leaving a component.
  • why dont u put this.http.get(url) in the cache ?
  • The second time I subscribe on the http.get the call is repeated.
  • as @Marcel-Hoekstra metioned u can use the share operator
  • Does the share operator work with already completed calls? In this case I use it for caching and the cache can be used minutes after the original call is made.
  • This looks promising! Which type can I use for the cache map? new Map<string, ?????>();
  • I think it can be Observable<T>