3
votes

I have two independent Observables that I'd like to couple in such a manner that every time the first emits a value, it causes the second to emit a value.

In this scenario, the first Observable is an infinite sequence based on an async Node.js-style method with variable callback timing (no usage of delay or timer allowed). The second Observable is a finite sequence of independent values.

Perhaps a diagram can express what I'm looking for:

I: ---A--D-----Q-G---B-> (I is infinite)
F: -3512-|->             (F is finite)
O: ---3--5-----1-2-|->   (O is finite)

The output sequence (O) is finite with the values from (F), based on the timing of (I).

Here's (I):

// I'm stuck with this, it cannot change...
function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 1000);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const callbackObservable = Rx.Observable.bindNodeCallback(asyncOperation);

const infiniteCallbackSequence = Rx.Observable
  .defer(callbackObservable)
  .expand(x => callbackObservable());

And for simplicity, here's (F):

const finiteSequence = Rx.Observable.from([3, 5, 1, 2]);

(This build upon a previous Question of mine here.)

I don't understand things well enough to make combine*, merge*, or *map work to generate what I want -- assuming they can. It seems like I want take(1) on (I) each time (O) emits.

How do I get the behavior described?

1
the | does not seem to match an event from I, is it normal?njzk2
I apologize, @njzk2. Those pipe symbols merely represent "end of sequence". I think the marble diagrams on the main RxJS documentation use 'X' to denote such, but that looked more confusing in my ASCII diagram above.David Aldridge

1 Answers

1
votes

Would Observable.zip work for your usecase?

const Rx = require('rxjs');
const timers = require('timers');

const finite = Rx.Observable.from([3, 5, 1, 2]);

// I'm stuck with this, it cannot change...
function asyncOperation(callback) {
  const delayMsec = Math.floor(Math.random() * 1000);
  timers.setTimeout(callback, delayMsec, null, delayMsec);
}

const callbackObservable = Rx.Observable.bindNodeCallback(asyncOperation);

const infiniteCallbackSequence = Rx
  .Observable
  .defer(callbackObservable)
  .expand(x => callbackObservable());

const combined = Rx.Observable.zip(
  finite,
  infiniteCallbackSequence
).forEach(v => console.log(v));

running on node 6:

 $ node test.js
[ 3, 816 ]
[ 5, 297 ]
[ 1, 95 ]
[ 2, 677 ]
 $