3
votes

When using Dependency injection in Angular I often need to subscribe to an observable that I haven't yet created!

I often end up using something like this:

// create behavior subject OF Observable<number>
const subject = new BehaviorSubject<Observable<number>>(EMPTY);

// subscribe to it, using flatMap such as to 'unwrap' the observable stream
const unwrappedSubject = subject.pipe(flatMap((x: number) => x));
unwrappedSubject.subscribe(s => console.log(s));

// now actually create the observable stream
const tim = timer(1000, 1000);

// set it into the subject
subject.next(tim);

This uses flatMap to 'unwrap' the observable contained in the subject.

This works fine, but frankly it always feels 'icky'.

What I really want is something like this, where the consumer of the subject treats the instance of the Subject as Observable<number> without having to pipe it every usage.

const subject = new UnwrappingBehaviorSubject<number>(EMPTY);
subject.subscribe((x: number) => console.log(x));

// this could use 'next', but that doesn't feel quite right
subject.setSource(timer(1000, 1000));

I'm aware that I could subscribe to the timer and hook it up directly to the subject, but I also want to avoid an explicit subscribe call because that complicates the responsibility of unsubscribing.

timer(1000, 1000).subscribe(subject);

Is there a nice way to achieve this?

The Subject.ts and BehaviorSubject.ts source files get more complicated than I expected. I'm scared I'll end up with horrible memory leaks if I try to fork it.

1
I would be more interested in a real example as to why you think you need this complex setup. Typically, I define my observables in the constructor of whatever's being injected.Joshua McCarthy
It is possible to define all observables outside the constructor at the variable definition itself. RxJS is built upon data streams (1 observable & n data emits) and not observable mutation (n observables & n data emits). I would try to work around by changing the providing service observables to be defined directly at variable definition instead of complex pipe solutions in the receiving component. If you have no opportunity to change the providing services a "ugly" workaround like yours is probably the best solution. You could hide your workaround by writing a custom operator for your needs.Jonathan Stellwag
Shot in the dark: defer?customcommander
@customcommander nice shot in the dark with deffer but he has his initial subscription (first subscribe) before he knows what observable should be created. So the observable factory will be called once on the first subscribe which means he will not be able to dynamically add more observables, which is the end goal.Християн Христов
@customcommander thanks for the tip. I use that in some places but as Християн explained that's not a general solution to this. I can definitely create a class or utility structure to clean up my code but I'd love to see a 'purer' way to do this. If it's ok for Subject to have next and asObservable (to prevent external next-ing) then I don't see why it's breaking any 'contracts' to have setSource or similar.Simon_Weaver

1 Answers

1
votes

I think this would be another way to solve it:

foo.component.ts

export class FooComponent {
  private futureObservable$ = new Observable(subscriber => {
    // 'Saving' the subscriber for when the observable is ready.
    this.futureObservableSubscriber = subscriber;

    // The returned function will be invoked when the below mentioned subject instance
    // won't have any subscribers(after it had at least one).
    return () => this.futureObservableSubscription.unsubscribe();
  }).pipe(
    // You can mimic the Subject behavior from your initial solution with the
    // help of the `share` operator. What it essentially does it to *place*
    // a Subject instance here and if multiple subscriptions occur, this Subject instance
    // will keep track of all of them.
    // Also, when the first subscriber is registered, the observable source(the Observable constructor's callback)
    // will be invoked.
    share()
  );
  private futureObservableSubscriber = null;
  // We're using a subscription so that it's easier to collect subscriptions to this observable.
  // It's also easier to unsubscribe from all of them at once.
  private futureObservableSubscription = new Subscription();
  
  constructor (/* ... */) {};

  ngOnInit () {
    // If you're using `share`, you're safe to have multiple subscribers.
    // Otherwise, the Observable's callback(i.e `subscriber => {...}`) will be called multiple times.
    futureObservable$.subscribe(/* ... */);
    futureObservable$.subscribe(/* ... */);
  }

  whenObservableReady () {
    const tim = timer(1000, 1000);
    
    // Here we're adding the subscription so that is unsubscribed when the main observable
    // is unsubscribed. This part can be found in the returned function from the Observable's callback.
    this.futureObservableSubscription.add(tim.subscribe(this.futureObservableSubscriber));
  }
};

Indeed, a possible downside is that you'll have to explicitly subscribe, e.g in the whenObservableReady method.

With this approach you can also have different sources:

whenAnotherObservableReady () {
  // If you omit this, it should mean that you will have multiple sources at the same time.
  this.cleanUpCrtSubscription();
  
  const tim2 = timer(5000, 5000);
  this.futureObservableSubscription.add(tim2.subscribe(this.futureObservableSubscriber));
}

private cleanUpCrtSubscription () {
  // Removing the subscription created from the current observable(`tim`).
  this.futureObservableSubscription.unsubscribe();
  this.futureObservableSubscription = new Subscription();
}