1
votes

Angular2 2.0.0-beta.15, Rxjs 5.0.0-beta.2

I have two simple rxjs observables, one shared, one not (.share() called on it)

I have an angular2 component which subscribes to both observables and also displays the values in its template, which also subscribes to the observables with async pipe.

I click a button to set .next() on both observables with some garbage.

The template updates and shows the latest value of each. The subscription function in the component does NOT fire for the non-shared observable. Why?

plunkr: https://plnkr.co/edit/IIWvnTT1zLit1eUNknkk?p=preview

@Component({
    selector: 'my-app',
    template: `
        <div>
            <div>obs1 value: {{ observable1 | async }}</div>
            <div>obs2 value: {{ observable2 | async }}</div>
            <button (click)="randomizeIt()">randomize it</button>

            <br>
            <h3>Question 1</h3>
            Why is obs1's initial value (from startWith) not displayed?

            <h3>Question 2</h3>
            Why does our subscription to obs2 never fire on new values?<br>
            check console.log to see what i mean.
        </div>`,
})
export class App {

    public observable1:Observable<string>;
    public observer1:any;
    public observable2:Observable<string>;
    public observer2:any;

    constructor() {
        this.observable1 = Observable.create(observer => {
            this.observer1 = observer;
        }).startWith("initial obs 1 value").share();

        this.observable1.subscribe((val) => {
            console.log('YOU WILL SEE THIS:', val);
        })

        this.observable2 = Observable.create(observer => {
            this.observer2 = observer;
        }).startWith("initial obs 2 value"); // no share!

        this.observable2.subscribe((val) => {
            console.log('WHY DOES THIS NEVER FIRE ON NEW VALUES?:', val);
        })

    }

    public randomizeIt() {
        let r = Math.random();
        console.log('set both obs to: ', r);
        this.observer1.next(r);
        this.observer2.next(r);
    }   

}

Thanks in advance!

1

1 Answers

8
votes

Q1. Why is obs1's initial value (from startWith) not displayed?

From the share() docs:

Returns an observable sequence that shares a single subscription to the underlying sequence.

Which means, in your case, {{ observable1 | async }} and this.observable1.subscribe(....) are sharing the same subscription and when you subscribed to observable1 in the constructor, you already started the sequence. When the view initialized, the subscription to observable1 was already started and the first value emitted. So async will call subscribe(), but will not get the first emit of the sequence.

So, if you move the subscription to inside ngAfterViewInit(), the initial value will go to async subscription.

ngAfterViewInit(){
  this.observable1.subscribe((val) => {
        console.log('YOU WILL SEE THIS CHANGE:', val);
    })
}

Q2. Why does our subscription to obs2 never fire on new values?

Every time you subscribe to a cold Observable, you actually create a new Observable instance. When you create a new instance, you create a new observer instance also. So, your subscription on the constructor is to observable instance1 and observer instance1 and the subscription by async is to observable instance2 and observer instance2. So, when you call randomizeIt(), you call observer2 the instance2 which is bound to observable instance2 subscribed to by async.

To make your observable "hot":

let obs = Observable.create(observer => {
    this.observer2 = observer;
}).startWith("initial obs 2 value");
this.observable2 = obs.publish();
    
this.observable2.subscribe((val) => {
    console.log('WHY DOES THIS NEVER FIRE?:', val);
})
this.observable2.connect();

Update
let's say you have an observable that emits a random value:

this.observable3 = Observable.create(observer => {
    observer.next(Math.random());
});

Now, every time you subscribe to this.observable3 you will get a new random value. Not the same value for every subscriber. Because there is a new instance for each subscriber.

From the docs:

It helps to think of cold and hot Observables as movies or performances that one can watch ("subscribe").

Cold Observables: movies.
Hot Observables: live performances.
Hot Observables replayed: live performances recorded on video.

Whenever you watch a movie, your run of the movie is independent of anyone else's run, even though all movie watchers see the same effects. On the other hand, a live performance is shared to multiple viewers. If you arrive late to a live performance, you will simply miss some of it. However, if it was recorded on video (in RxJS this would happen with a BehaviorSubject or a ReplaySubject), you can watch a "movie" of the live performance. A .publish().refCount() live performance is one where the artists quit playing when no one is watching, and start playing again when there is at least one person in the audience.