21
votes

I have a parent observable that, once it has a subscriber, will do a lookup and emit a single value, then complete.

I'd like to convert that into an observable (or behavior subject or whatever works) that does the following: once it has at least one subscriber, it gets the result from the parent observable (once). Then it emits that value to all of its subscribers, and also emits that single value to all future subscribers, when they subscribe. It should continue with this behavior even if its subscriber count drops to zero.

It seems like this should be easy. Here is what didn't work:

theValue$: Observable<boolean> = parent$
.take(1)
.share()

Other things that didn't work: publishReplay(), publish(). Something that worked better:

theValue$ = new BehaviorSubject<boolean>(false);

parent$
.take(1)
.subscribe( value => theValue$.next(value));

There is a problem with this approach, though: parent$ is subscribed to before theValue$ gets its first subscriber.

Is there a better way to handle this?

3

3 Answers

17
votes

shareReplay should do what you want:

import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);

shareReplay was added in RxJS version 5.4.0. It returns a reference counted observable that will subscribe to the source - parent$ - upon the first subscription being made. And subscriptions that are made after the source completes will receive replayed notifications.

shareReplay - and refCount in general - is explained in more detail in an article I wrote recently: RxJS: How to Use refCount.

9
votes

I've implemented a method to convert Observables to BehaviorSubjects, as I think that the shareReplay method isn't very readable for future reference.

import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
    const subject = new BehaviorSubject(initValue);

    observable.subscribe(
        (x: T) => {
            subject.next(x);
        },
        (err: any) => {
            subject.error(err);
        },
        () => {
            subject.complete();
        },
    );

    return subject;
}
3
votes

This is an improved variant of the tmuechsch's answer.

import { Observable, BehaviorSubject } from 'rxjs';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
  const subject = new BehaviorSubject(initValue);
  const subscription = observable.subscribe(subject);
  return {
    subject,
    stopWatching: () => subscription.unsubscribe()
  };
}

Be careful because the returned subject never unsubscribes from the source observable. You need to call stopWatching manually when you know that there are no more references to subject (e.g. when a view component is destroyed/unmounted). Otherwise you'll get a memory leak.

It's impossible to make an absolutely safe solution for the given problem. The reason is that a behavior subject has a value attribute that must always be updated even if the subject isn't subscribed to, therefore you can't unsubscribe from observable automatically when everybody unsubscribes from subject.

The cartant's solution isn't perfect too, because the result is not instanceof BehaviorSubject and shareReplay records the values only when it's subscribed to.