13
votes

Im fairly new to RxJs and I would like to understand what the best way is to work with Rx in combination with Promises.

What I want to create is a service in Angular that acts much as an event dispatcher pattern and emits an event once a promise is complete. What I also require is that, if there are no (event) subscribers the observable never gets called. The last thing I want to happen is that any subsequent subscribers to the observable get the same result without triggering another request to the server. I have managed to implement my own solution here:

// ... CountryService code

var COUNTRIES_LOADED = Rx.Observable
    .create(function (observer) {
        $http
            .get('/countries')
            .then(function (res) {
                observer.onNext(res);
            }, function (err) {
                observer.onError(err);
            })
            .finally(function () {
                observer.onCompleted();
            });
    })
    .shareReplay();

Now anytime I subscribe a new "listener" to subject the observable will be pulled. Any new subscribers will get the value cached without touching the server again.

So inside my "consumer" (Angular Directive) I would like to do something like this:

// ... countryInput directive code:

COUNTRIES_LOADED.subscribe(function (response) {
    // Fill in countries into scope or ctrl
    scope.countries = response.countries;
});

Any future subscribers to the COUNTRIES_LOADED observer MUST NOT trigger an $http request. Likewise, if the directive is never included on the page, $http will never get called.

The solution above works, however I am not aware of the potential drawbacks and memory implications of this approach. Is this a valid solution? Is there a better / more appropriate way to achieve this using RxJs?

Many thanks!

4
Your solution looks fine, I don't think that you can come up with something better than this. - Estus Flask

4 Answers

7
votes

Use Rx.Observable.fromPromise(promise)

fromPromise:

Converts a Promises/A+ spec compliant Promise and/or ES2015 compliant Promise or a factory function which returns said Promise to an Observable sequence.

example:

var source = Rx.Observable.fromPromise(promise);

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

update

rxjs6 method is from

4
votes

I found the answer here (Just slightly differently named) rxjs using promise only once on subscribe

So for my example the answer is as simple as:

var loadCountries = function () { return $http.get('/countries'); };

var observable = Rx.Observable.defer(loadCountries).shareReplay();
4
votes

update

As of rxjs6 you can use from()


Did you tried to use the fromPromise() API of rxjs5 ?

Check it's documentation here !

1
votes

This is how you can use Observables Lets say you have a method called getuser(username).

//Returns an observable
getUser(username){
    return $http.get(url)
        .map(res => res.json());
}

And you can use it as below

getUser.subscribe(res => console.log(response));

BUT if you want to use promises

//Returns an Promise
//Donot forget to import toPromise operator
getUser(username){
    return $http.get(url)
        .map(res => res.json())
        .toPromise();
}

And you can use it as below

getUser.then(res => console.log(response));