Wednesday, November 24, 2021

RxJS: Avoiding memory leaks

 I decided to write this article after watching the video by Deborah Kurata from ng-conf about Data Composition with RxJS. Debora proposed this solution for caching array data. So, what do you think is wrong with current implementation?

It’s high time to answer this question. First, we do not have any issue here because HttpClient from RxJS just returns the result only once. But what would happen here if we got the result more than once?

import { tap, shareReplay, pipe, interval } from "rxjs";

const source = interval(1000).pipe(
  tap(console.log),
  shareReplay()
);

const sub = source.subscribe();
sub.unsubscribe();

And here is our output. That is probably not the result which you expected to see :)

As you see, I’ve just replaced HttpClient with interval Observable from RxJS. It can be any observable which we can run multiple times. For example, like the sample below.

const form = new FormGroup({
  groupName: ["Test", Validators.required],

});

const source = this.form.valueChanges
      .pipe(
        debounceTime(2000),
        tap(console.log),
        shareReplay()
      )
      .subscribe();

const sub = source.subscribe();
sub.unsubscribe();

By default, shareReplay is designed for using with "heavy" async calls that you don't want to redo. Once upon a time, this operator was called cache, but people didn't like that name. By default, shareReplay will forget reference counting under the hood, and once initially subscribed to, will maintain that underlying subscription to the source until the source completes. This is done in order to optimistically keep fetching something in the assumption that whatever that thing is, it's resource intensive, slow, or heavy. So, finally this issue was solved in RxJS 7.0, and for the never-ending interval shareReplay({refCount: true } is used. The refCount: true part means that it will have the behavior you expect, where if all subscribers unsubscribe, the source subscription is also unsubscribed. More information can be found here Reworking Multicasting: share, connect, and makeConnectable

Let’s try to update this example in the following way:

import { tap, shareReplay, pipe, interval } from "rxjs";

const source = interval(1000).pipe(
  tap(console.log),
  shareReplay({ refCount: true })
);

const sub = source.subscribe();
sub.unsubscribe();

As you see. Finally, it works.

Another workaround for this solution would be using publishReplay() + refCount(). Whenever multicasting stream has multiple values, or even doesn’t complete on its own, it might be better to use the combination publishReplay(1) + refCount().

const source = interval(1000).pipe(
  tap(console.log),
  publishReplay(1),
  refCount()
);

const sub = source.subscribe();
sub.unsubscribe();

Let’s imagine the situation when we try to use the same pattern in code angular application. Instead of tap(console.log), we just return the result using switchMap and instead of subscribing and unsubscribing manually in ngOnDestroy, we will use async pipe (| pipe). Do you think it prevents memory leaks here? The answer is still – No, it does not.

You probably have read a bunch of times that using async pipe prevents such memory leaks and angular handles subscriptions of | async pipes for us automatically, so there is no need to unsubscribe manually in the component using ngOnDestroy. This leads to less verbosity and hence less possibilities for making a mistake. However, async pipe causes the same memory leak if we forget to use additional parameters {refCount: true}.

But what happens here if we use takeUntil in our application for automatically unsubscribe from multiple observables? You can read more about this pattern in Ben Lesh’s article RxJS: Don’t Unsubscribe. I would suggest considering takeUntil as a  generally-accepted pattern for unsubscribing upon an Angular component’s destruction. For this purpose, I’ve created a simple angular application with two buttons where the result is received from backend server at a click of a button.

import { Component, OnInit } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { BehaviorSubject, Subject } from "rxjs";
import { tap, shareReplay, combineLatest, takeUntil } from "rxjs/operators";

@Component({
  selector: "app-root",
  template: `<div>
    <h1>Welcome to {{ title }}!</h1>
    <button (click)="getResult()">Get Result</button>
    <button (click)="clearSubscription()">Clear subscription</button>
  </div>`,
  styleUrls: ["./app.component.css"]
})

export class AppComponent implements OnInit {
  constructor(private http: HttpClient) {}

  title = "CodeSandbox";
  sub = new BehaviorSubject<string>("Test");

  private disposed$ = new Subject();

  ngOnInit(): void {
    this.sub
      .pipe(
        combineLatest([
          this.http.get("https://jsonplaceholder.typicode.com/todos/1"),
          this.http.get("https://jsonplaceholder.typicode.com/users/1")
        ]),
        tap(console.log),
        shareReplay(),
        takeUntil(this.disposed$)
      )
      .subscribe();
  }

  getResult() {
    this.sub.next("Test 4");
  }

  clearSubscription(): void {
    console.log("call unsibscribe");
    this.disposed$.next();
    this.disposed$.complete();
  }

  ngOnDestroy() {
    this.disposed$.next();
    this.disposed$.complete();
  }
}

And here is the result of running the code

The general rule for takeUntil is to be placed as the last operator. However, there are some cases in which you might want to use it as the second-last operator. The problem is related to the bug that existed in shareReplay() operator feat(shareReplay): add config parameter. So, there are at least 3 different ways of how to fix this issue, and we can start from the simplest one. We can swap the position for takeUnit and shareReplay and make takeUnit the second-last operator.

this.sub
  .pipe(
    combineLatest([
      this.http.get("https://jsonplaceholder.typicode.com/todos/1"),
      this.http.get("https://jsonplaceholder.typicode.com/users/1")
    ]),
    tap(console.log),
    takeUntil(this.disposed$),
    shareReplay(),
  )
  .subscribe();

The second variant has already been used before, and we just need to pass refCount parameter to shareReplay.

this.sub
  .pipe(
    combineLatest([
      this.http.get("https://jsonplaceholder.typicode.com/todos/1"),
      this.http.get("https://jsonplaceholder.typicode.com/users/1")
    ]),
    tap(console.log),
    shareReplay({refCount: true}),
    takeUntil(this.disposed$)
  )
  .subscribe();

And last but not least is taken from John Papa ng-conf These ARE the Angular tips you are looking for where he demonstrated the same issue.

And for working with subscriptions John Papa proposed to use subsink package. And here will be the solution using subsink subscription:

private subs = new SubSink();

ngOnInit(): void {
  this.sub
    .pipe(
      combineLatest([
        this.http.get("https://jsonplaceholder.typicode.com/todos/1"),
        this.http.get("https://jsonplaceholder.typicode.com/users/1")
      ]),
      tap(console.log),
      shareReplay(),
    )
    .subscribe();
  this.subs.add(this.sub);
}

ngOnDestroy() {
  this.subs.unsubscribe();
}

Besides the samples above, I would like to propose the solution which was used in our project, and it provides much more elegant way of handling memory leaks with subscription and shareReplay calls. We can use @ngneat/until-destroy npm package, which provides us with a neat way of how to unsubscribe from observables when the component is destroyed. Let’s check how our application will change after introducing this package.

@UntilDestroy()
@Component({
  selector: "app-root",
  template: `<div>
    <h1>Welcome to {{ title }}!</h1>
    <button (click)="getResult()">Get Result</button>
    <button (click)="clearSubscription()">Clear subscription</button>
  </div>`,
  styleUrls: ["./app.component.css"]
})
export class AppComponent implements OnInit {

And our ngOnInit:

ngOnInit(): void {
  this.sub
    .pipe(
      untilDestroyed(this),
      combineLatest([
        this.http.get("https://jsonplaceholder.typicode.com/todos/1"),
        this.http.get("https://jsonplaceholder.typicode.com/users/1")
      ]),
      tap(console.log),
      shareReplay(),
    )
    .subscribe();
}

Due to this package you just do not need to struggle with the order of takeUntil, which can be put either as the last or the second-last parameter, because untilDestroyed will always be number one in the list, and you should not care about other operators and their priorities.

Defining the problem

Whenever we use the shareReplay operator, we must be very careful. The shareReplay operator does not clean up observables when they have not yet completed. This will introduce a memory leak into our application.