4
votes

Update

After the solution was found I wrote a small helper ng2-rx-collector based on the accepted answer to make it even easier to use. Hope it helps somebody facing the same problems again and again.

Original question

Assume we have a component with two subscriptions on hot observables. We subscribe to them in ngOnInit and unsubscribe in ngOnDestroy in order to avoid the memory leaks / unexpected behavior:

public ngOnInit() {
  this.s1 = o1.subscribe(console.debug.bind(console));
  this.s2 = o2.subscribe(console.debug.bind(console));
}

public ngOnDestroy() {
  this.s1.unsubscribe();
  this.s2.unsubscribe();
}

I love Rx, but I literally want to kill myself every time I need to follow this:

  1. Create a private subscription property for each subscription
  2. Assign this property to a subscription (this looks really ugly because the real logic goes to the right side)
  3. Unsubscribe from each subscription on destroy

Is there any way to improve this?

E.g. in RxSwift they have a DisposeBag in order to improve the process, translated to typescript would be:

private let bag = DisposeBag();

...

o1.subscribe(...).addDisposableTo(bag);

And then only destroying it only once. Problem is that I cannot find any similar Subscription function.

Any ideas / hints would be warmly welcomed.

2
You can create an array inside a super class and push your subscriptions to this array. Then on super classes ngOnDestroy you can unsubscribe from the elements of the array. - eko
This does not solve the problem that I need to wrap the whole observable into brackets etc. I already tried that way and even wrote a small decorator which did it automatically, however it does not really look better. - smnbbrv

2 Answers

5
votes

You can do this:

private teardown$ = new Subject<void>();

public ngOnInit() {
    o1.takeUntil(this.teardown$).subscribe(console.debug.bind(console));
    o2.takeUntil(this.teardown$).subscribe(console.debug.bind(console));
}

public ngOnDestroy() {
   this.teardown$.next();
}
2
votes

What you describe used to be called "disposables" in RxJS 4 (correct me if I' wrong) or in RxPHP as well. In RxJS 5 this is called Subscription but the purpose is exactly the same.

In the following example I have two source Observables. I wrap both their unsubscribe calls with a single Subscription object that takes a callback as parameter used when calling its unsubscribe() method.

var source1 = Observable.interval(250);
var source2 = Observable.interval(350);

let sub1 = source1.subscribe(val => console.log(val));
let sub2 = source2.subscribe(val => console.log(val));

let subscriptions = new Subscription(() => {
    sub1.unsubscribe();
    sub2.unsubscribe();
});

setTimeout(() => {
    subscriptions.unsubscribe();
}, 3000);

Similarly I can also take the first Subscription from source1.subscribe and add another Subscription that'll be called along with its own unsubscribe() call using add() method:

var source1 = Observable.interval(250);
var source2 = Observable.interval(350);

let subscriptions = source1.subscribe(val => console.log(val));
subscriptions.add(source2.subscribe(val => console.log(val)));

setTimeout(() => {
    subscriptions.unsubscribe();
}, 3000);

For more info have a look at the source code: https://github.com/ReactiveX/rxjs/blob/master/src/Subscription.ts