2
votes

I just realized that inner-observables (like those defined in a mergeMap or switchMap operator) do not "stop" even when the outer-observable has no subscription left.

For a better example, let's show some code:

const {
  Subject,
  of: obsOf,
  concat: obsConcat,
  defer,
} = require("rxjs");
const {
  finalize,
  mergeMap,
  tap,
  takeUntil,
} = require("rxjs/operators");

const subject = new Subject();

obsOf(null).pipe(
  mergeMap(() =>
    obsConcat(
      defer(() => {
        console.log("side-effect 1");
        return obsOf(1);
      }),
      defer(() => {
        console.log("side-effect 2");
        return obsOf(2);
      }),
      defer(() => {
        console.log("side-effect 3");
        return obsOf(3);
      })
    )
  ),
  finalize(() => {
    console.log("finalized");
  })
)
.pipe(
  takeUntil(subject),
  tap((i) => {
    if (i === 2) {
      subject.next();
    }
  })
).subscribe(
  (i) => { console.log("next", i); },
  (e) => { console.log("error", e); },
  () => { console.log("complete"); },
);

// Ouput:
// > side-effect 1
// > next 1
// > side-effect 2
// > complete
// > finalized
// > side-effect 3

The fact that the side-effect 3 line is logged is weird since the outer observable already called finalize.

As all those side-effects are in a defer, they could perfectly be avoided after unsubscription. From my point-of-view, those side-effects provide no value at all.

Any idea why RxJS still execute those ?

1

1 Answers

0
votes

This is unfortunately by design (as of RxJS 6) - concat will buffer the observables and will subscribe to each buffered one even after you unsubscribe (if the subscription is closed it will subscribe and immediately unsubscribe).

You have to prevent the observables from getting buffered...

obsOf(null).pipe(
  mergeMap(() => obsOf(
    defer(() => {
      console.log("side-effect 1");
      return obsOf(1);
    }),
    defer(() => {
      console.log("side-effect 2");
      return obsOf(2);
    }),
    defer(() => {
      console.log("side-effect 3");
      return obsOf(3);
    })
  )),
  concatAll(),
  finalize(() => {
    console.log("finalized");
  }),
  takeUntil(subject),
  tap((i) => {
    if (i === 2) {
      subject.next();
    }
  })
).subscribe(
  (i) => { console.log("next", i); },
  (e) => { console.log("error", e); },
  () => { console.log("complete"); },
);

One could think the code above works, but only until you delay one of the observables. Replace obsOf(1) with timer(100).pipe(mapTo(1)); and behavior is exactly the same.

The only workaround is to make sure you are not buffering anything (mean don't use concat* operators) or limit observable production some other way (use separate Subject and control the production manually).