5
votes

Is there a way to have a hot observable from an EventEmitter (or equivalent available in Angular 2 alpha 46 / RxJS 5 alpha)? i.e. if we subscribe after the value is resolved, it triggers with the previously resolved value. Similar to what we have when always returning the same promise.

Ideally, only using Angular 2 objects (I read somewhere a light RxJS would be embedded later to remove the dependency), otherwise importing RxJS is fine. AsyncSubject seems to match my need, but it is not available in RxJS 5 alpha.

I tried the following, without success (never triggers). Any idea about how to use it?

let emitter = new EventEmitter<MyObj>();
setTimeout(() => {emitter.next(new MyObj());});
this.observable = emitter;
return this.observable.share();

Full plunker here comparing hot and cold

Usecase: reach some async objects only once (for example a series of HTTP calls merged/wrapped in a new EventEmitter), but provide the resolved async object to any service/component subscribing to it, even if they subscribe after it is resolved (the HTTP responses are received).

EDIT: the question is not about how to merge HTTP responses, but how to get a (hot?) observable from EventEmitter or any equivalent available with Angular 2 alpha 46 / RxJS 5 alpha that allows to subscribe after the async result is retrieved/resolved (HTTP is just an example of async origin). myEventEmitter.share() does not work (cf plunker above), although it works with the Observable returned by HTTP (cf plunker from @Eric Martinez). And as of Angular 2 alpha 46, .toRx() method does not exist any more, the EventEmitter is the observable and subject itself.

This is something working well with promises as long as we always return the same promise object. Since we have observers introduced with HTTP Angular 2 services, I would like to avoid mixing promises and observers (and observers are said to be more powerful than promises, so it should allow to do what is easy with promises).

Specs about share() (I haven't found doc for version 5 alpha - version used by Angular 2) - working on the Observable returned by the Angular 2 HTTP service, not working on EventEmitter.

EDIT: clarified why not using the Observable returned by HTTP and added that not using RxJS directly would be even better.

EDIT: changed description: the concern is about multiple subscriptions, not merging the HTTP results.

Thanks!

2
I have an example of http using share in this plnkr. Why do you ask about converting an EventEmitter to something when your usecase is using Http module?Eric Martinez
It's misleading in my description, the usecase is slightly more complex: a series of HTTP calls, and once completed, I merge/wrap the results in a new Observable. Equivalent with promises (angular 1 syntax): return $q.all(httpPromisesToResolve);Antoine
I don't think you need share() for this. What you need is flatMap. Check this exampleEric Martinez
Actually my concern is more about how to have multiple subscribers, and trigger the HTTP requests / trigger async process only once, regardless of what transformations is done on the results, although it is great to see what is possible with the flatMap (I learnt it, thanks :)). The AsyncSubject suggested by @user3743222 seems to be the solution, now I am checking how to use it in Angular 2 with the RxJS lib v.5 alpha.Antoine
share() is working well on the Observer returned by HTTP (I just checked, it is doing the job, converts to hot observable), but it doesn't work on EventEmitter.Antoine

2 Answers

3
votes

The functionality you seem to be describing is not that of a cold observable but more than of a Rx.BehaviourSubject. Have a look here for an explanation on Rxjs subjects : https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md.

I quote from there :

BehaviourSubject is similar to ReplaySubject, except that it only stored the last value it published. BehaviourSubject also requires a default value upon initialization. This value is sent to observers when no other value has been received by the subject yet. This means that all subscribers will receive a value instantly on subscribe, unless the Subject has already completed.

The Rx.AsyncSubject would be the closest in behaviour to a promise :

AsyncSubject is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the sequence is completed. You can use the AsyncSubject type for situations when the source observable is hot and might complete before any observer can subscribe to it. In this case, AsyncSubject can still provide the last value and publish it to any future subscribers.

Two more comments:

  • in your plunker : this._coldObservable = emitter.share();. Using share returns a hot observable!
  • EventEmitter actually extends subject in the first place

UPDATE : Wrapping an EventEmitter around an Rx.Observable:

function toRx ( eventEmitter ) {
  return Rx.Observable.create(function ( observer ) {
    eventEmitter.subscribe(function listener ( value ) {observer.onNext(value)});
    // Ideally you also manage error and completion, if that makes sense with Angular2
    return function () {
      /* manage end of subscription here */
    };
  };
)
}

Once you have that Rx.Observable, you can apply share(), shareReplay(1), anything you want.

My bet is that the Angular team will sooner or later propose a brigding function but if you don't want to wait, you can do it yourself.

1
votes

ReplaySubject is doing what I was looking for. @robwormald provided a working example on gitter I slightly modified to better demonstrate.

Exposing HTTP response:

import {Injectable} from 'angular2/angular2';
import {Http} from 'angular2/http';
import {ReplaySubject} from '@reactivex/rxjs/dist/cjs/Rx'

@Injectable()
export class PeopleService {
  constructor(http:Http) {
    this.people = new ReplaySubject(1);

    http.get('api/people.json')
      .map(res => res.json())
      .subscribe(this.people);
  }
}

Subscribing multiple times:

// ... annotations
export class App {
  constructor(peopleService:PeopleService) {

    people.subscribe(v => {
      console.log(v)
    });

    //some time later

    setTimeout(() => {
      people.subscribe(v => {
        console.log(v)
      });
      people.subscribe(v => {
        console.log(v)
      });
    },2000)
  }
}

Full plunker

EDIT: the BehaviorSubject is an alternative. In this usecase, the difference is the initial value, for example if we want to display content from cache before updating with the HTTP response.