1
votes

Coming from the Promise world, I can implement a queue function that returns a Promise that won't execute until the previous Promise resolves.

var promise = Promise.resolve();
var i = 0;

function promiseQueue() {
  return promise = promise.then(() => {
    return Promise.resolve(++i);
  });
}

promiseQueue().then(result => {
  console.log(result); // 1
});
promiseQueue().then(result => {
  console.log(result); // 2
});
promiseQueue().then(result => {
  console.log(result); // 3
});
// -> 1, 2, 3

I'm trying to recreate this queue-like function using Observables.

var obs = Rx.Observable.of(undefined);
var j = 0;

function obsQueue() {
  return obs = obs.flatMap(() => {
    return Rx.Observable.of(++j);
  });
}

obsQueue().subscribe(result => {
  console.log(result); // 1
});
obsQueue().subscribe(result => {
  console.log(result); // 3
});
obsQueue().subscribe(result => {
  console.log(result); // 6
});
// -> 1, 3, 6

Every time I subscribe, it re-executes the history of the Observable, since at the time of subscription the "current Observable" is actually an Observable which emits multiple values, rather than the Promise that just waits until the last execution has completed.

flatMap isn't the answer for this use case, and nearly all the "chain" and "queue" answers I can find online are about chaining several Observables that are part of one overall Observable, where flatMap is the correct answer.

How can I go about creating the above Promise queue function using Observables?

For context, this queue function is being used in a dialog service, which dictates only one dialog can be shown at a time. If multiple calls are made to show different dialogs, they only appear one at a time in the order that they were called.

2
So you want to take the next item from the observable each time? Have you tried .take(1)? - jonrsharpe
I want to wait until complete() is called on the previous Observable's observer before creating a new one. I've played around with finally() but haven't found a way to make it work. - hotforfeature
If you seek a specific order, concatMap should be used. The inner value could use a subject that emit when the notification should call the next one - LookForAngular

2 Answers

0
votes

If you change:

return obs = obs.flatMap...

With

return obs.flatMap...

You will see the same output as you do with promises (1, 2, 3).

To chain observables such that the next one is not executed until the previous one is complete, use the concat operator

let letters$ = Rx.Observable.from(['a','b','c']);
let numbers$ = Rx.Observable.from([1,2,3]);
let romans$ = Rx.Observable.from(['I','II','III']);

letters$.concat(numbers$).concat(romans$).subscribe(e=>console.log(e));
//or...
Rx.Observable.concat(letters$,numbers$,romans$).subscribe(e=>console.log(e));
// results...
a   b   c   1   2   3   I   II   III

Live demo

-1
votes

Figured it out! May not be quite as elegant as the Promise chain, and I'm definitely open to suggestions to clean it up.

var trigger = undefined;
function obsQueue() {
  if (!trigger || trigger.isStopped) {
    trigger = new Rx.Subject();
    return createObservable(trigger);
  } else {
    var lastTrigger = trigger;
    var newTrigger = trigger = new Rx.Subject();
    return lastTrigger.last().mergeMap(() => {
      return createObservable(newTrigger);
    });
  }
}

var j = 0;
function createObservable(trigger) {
  // In my use case, this creates and shows a dialog and returns an 
  // observable that emits and completes when an option is selected. 
  // We want to make sure we only create the next dialog when the previous
  // one is closed.
  console.log('creating');
  return new Rx.Observable.of(++j).finally(() => {
    trigger.next();
    trigger.complete();
  });
}

obsQueue().subscribe(result => {
  console.log('first', result);
});
obsQueue().subscribe(result => {
  console.log('second', result);
});
obsQueue().subscribe(result => {
  console.log('third', result);
});
var timer = setTimeout(() => {
  obsQueue().subscribe(result => {
    console.log('fourth', result);
  });
}, 1000);

// Output:
// creating
// first 1
// creating
// second 2
// creating
// third 3
// creating
// fourth 4

Rather than try to figure out how to chain them in order, I have each observable create its own trigger to let the next observable know when to create itself.

If all the triggers have been completed (setTimeout case, we queue up another one later), then the queue starts again.