I'm making use of the withLatestFrom
operator in RxJS in the normal way:
var combined = source1.withLatestFrom(source2, source3);
...to actively collect the most recent emission from source2
and source3
and to emit all three value only when source1
emits.
But I cannot guarantee that source2
or source3
will have produced values before source1
produces a value. Instead I need to wait until all three sources produce at least one value each before letting withLatestFrom
do its thing.
The contract needs to be: if source1
emits then combined
will always eventually emit when the other sources finally produce. If source1
emits multiple times while waiting for the other sources we can use the latest value and discard the previous values. Edit: as a marble diagram:
--1------------2---- (source)
----a-----b--------- (other1)
------x-----y------- (other2)
------1ax------2by--
--1------------2---- (source)
------a---b--------- (other1)
--x---------y------- (other2)
------1ax------2by--
------1--------2---- (source)
----a-----b--------- (other1)
--x---------y------- (other2)
------1ax------2by--
I can make a custom operator for this, but I want to make sure I'm not missing an obvious way to do this using the vanilla operators. It feels almost like I want combineLatest
for the initial emit and then to switch to withLatestFrom
from then on but I haven't been able to figure out how to do that.
Edit: Full code example from final solution:
var Dispatcher = new Rx.Subject();
var source1 = Dispatcher.filter(x => x === 'foo');
var source2 = Dispatcher.filter(x => x === 'bar');
var source3 = Dispatcher.filter(x => x === 'baz');
var combined = source1.publish(function(s1) {
return source2.publish(function(s2) {
return source3.publish(function(s3) {
var cL = s1.combineLatest(s2, s3).take(1).do(() => console.log('cL'));
var wLF = s1.skip(1).withLatestFrom(s2, s3).do(() => console.log('wLF'));
return Rx.Observable.merge(cL, wLF);
});
});
});
var sub1 = combined.subscribe(x => console.log('x', x));
// These can arrive in any order
// and we can get multiple values from any one.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
Dispatcher.onNext('baz');
// combineLatest triggers once we have all values.
// cL
// x ["foo", "bar", "baz"]
// withLatestFrom takes over from there.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
// wLF
// x ["foo", "bar", "baz"]
// wLF
// x ["foo", "bar", "baz"]
zip
? – Konrad Garuszip
won't do because all three sources will produce at wildly different rates. Use-case issource1
is usually a human interaction, like a click, and the other sources are fetching data from various sources -- ajax response, SSE stream, etc. – whiteingejoin
seems to be pretty close then: reactivex.io/documentation/operators/join.html – Konrad Garus