20
votes

I'm making use of the withLatestFrom operator in RxJS in the normal way:

var combined = source1.withLatestFrom(source2, source3);

...to actively collect the most recent emission from source2 and source3 and to emit all three value only when source1 emits.

But I cannot guarantee that source2 or source3 will have produced values before source1 produces a value. Instead I need to wait until all three sources produce at least one value each before letting withLatestFrom do its thing.

The contract needs to be: if source1 emits then combined will always eventually emit when the other sources finally produce. If source1 emits multiple times while waiting for the other sources we can use the latest value and discard the previous values. Edit: as a marble diagram:

--1------------2---- (source)
----a-----b--------- (other1)
------x-----y------- (other2)
------1ax------2by--


--1------------2---- (source)
------a---b--------- (other1)
--x---------y------- (other2)
------1ax------2by--


------1--------2---- (source)
----a-----b--------- (other1)
--x---------y------- (other2)
------1ax------2by--

I can make a custom operator for this, but I want to make sure I'm not missing an obvious way to do this using the vanilla operators. It feels almost like I want combineLatest for the initial emit and then to switch to withLatestFrom from then on but I haven't been able to figure out how to do that.

Edit: Full code example from final solution:

var Dispatcher = new Rx.Subject();
var source1 = Dispatcher.filter(x => x === 'foo');
var source2 = Dispatcher.filter(x => x === 'bar');
var source3 = Dispatcher.filter(x => x === 'baz');

var combined = source1.publish(function(s1) {
    return source2.publish(function(s2) {
        return source3.publish(function(s3) {
            var cL = s1.combineLatest(s2, s3).take(1).do(() => console.log('cL'));
            var wLF = s1.skip(1).withLatestFrom(s2, s3).do(() => console.log('wLF'));

            return Rx.Observable.merge(cL, wLF);
        });
    });
});

var sub1 = combined.subscribe(x => console.log('x', x));

// These can arrive in any order
// and we can get multiple values from any one.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
Dispatcher.onNext('baz');

// combineLatest triggers once we have all values.
// cL
// x ["foo", "bar", "baz"]

// withLatestFrom takes over from there.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
// wLF
// x ["foo", "bar", "baz"]
// wLF
// x ["foo", "bar", "baz"]
5
What's the use case? Are you looking for zip?Konrad Garus
zip won't do because all three sources will produce at wildly different rates. Use-case is source1 is usually a human interaction, like a click, and the other sources are fetching data from various sources -- ajax response, SSE stream, etc.whiteinge
I see. join seems to be pretty close then: reactivex.io/documentation/operators/join.htmlKonrad Garus
did you try with using withLatestFrom only? According to the docs and the behaviour I'm observing the withLatestFrom method actually does wait for every source to produce. Your full set of requirements is met by only using withLatestFrom. Looking at it from another perspective: the publish-nesting is an interesting way to handle operator switching problems.Herman
In my experience withLatestFrom does wait for every source to produce, but that means it skips any events which occurred before that state happened. I'm experiencing a similar issue to the OP, where S1 emits most or all of its values before S2 emits one; I want it to wait for S2 and then emit for each S1 value with that value, but it just skips all by the last S1 value.taxilian

5 Answers

9
votes

I think the answer is more or less as you described, let the first value be a combineLatest, then switch to withLatestFrom. My JS is hazy, but I think it would look something like this:

var selector = function(x,y,z) {};

var combined = Rx.Observable.concat(
    source1.combineLatest(source2, source3, selector).take(1),
    source1.withLatestFrom(source2, source3, selector)
);

You should probably use publish to avoid multiple subscriptions, so that would look like this:

var combined = source1.publish(function(s1)
{
    return source2.publish(function(s2)
    {
        return source3.publish(function(s3)
        {
            return Rx.Observable.concat(
                s1.combineLatest(s2, s3, selector).take(1),
                s1.withLatestFrom(s2, s3, selector)
            );
        });
    });
});

or using arrow functions...

var combined = source1.publish(s1 => source2.publish(s2 => source3.publish(s3 => 
    Rx.Observable.concat(
        s1.combineLatest(s2, s3, selector).take(1),
        s1.withLatestFrom(s2, s3, selector)
    )
)));

EDIT:

I see the problem with concat, the withLatestFrom isn't getting the values. I think the following would work:

var combined = source1.publish(s1 => source2.publish(s2 => source3.publish(s3 => 
    Rx.Observable.merge(
        s1.combineLatest(s2, s3, selector).take(1),
        s1.skip(1).withLatestFrom(s2, s3, selector)
    )
)));

...so take one value using combineLatest, then get the rest using withLatestFrom.

6
votes

I wasn't quite satisfied with the accepted answer, so I ended up finding another solution. Many ways to skin a cat!

My use-case involves just two streams - a "requests" stream and a "tokens" stream. I want requests to fire as soon as they are received, using the whatever the latest token is. If there is no token yet, then it should wait until the first token appears, and then fire off all the pending requests.

I wasn't quite satisfied with the accepted answer, so I ended up finding another solution. Essentially I split the request stream into two parts - before and after first token arrives. I buffer the first part, and then re-release everything in one go once I know that the token stream is non-empty.

const first = token$.first()

Rx.Observable.merge(
  request$.buffer(first).mergeAll(),
  request$.skipUntil(first)
)
  .withLatestFrom(token$)

See it live here: https://rxviz.com/v/VOK2GEoX

2
votes

I had similar requirements but for just 2 observables. I ended up using switchMap+first:

observable1
 .switchMap(() => observable2.first(), (a, b) => [a, b])
 .subscribe(([a, b]) => {...}));

So it:

  • waits until both observables emit some value
  • pulls the value from second observable only if the first one has changed (unlike combineLatest)
  • doesn't hang subscribed on second observable (because of .first())

In my case, second observable is a ReplaySubject. I'm not sure if it will work with other observable types.

I think that:

  • flatMap would probably work too
  • it might be possible to extend this approach to handle more than 2 observables

I was surprised that withLatestFrom will not wait on second observable.

1
votes

Use combineLatest and filter to remove tuples before first full set is found then set a variable to stop filtering. The variable can be within the scope of a wrapping defer to do things properly (support resubscription). Here it is in java (but the same operators exist in RxJs):

Observable.defer(
    boolean emittedOne = false;
    return Observable.combineLatest(s1, s2, s3, selector)
        .filter(x -> {
            if (emittedOne) 
               return true;
            else {
               if (hasAll(x)) {
                   emittedOne = true;
                   return true;
               } else 
                   return false; 
            }
        });
)
-3
votes

Actually withLatestFrom already

  • waits for every source
  • emits only when source1 emits
  • remembers only the last source1-message while the other sources are yet to start
// when source 1 emits the others have emitted already
var source1 = Rx.Observable.interval(500).take(7)
var source2 = Rx.Observable.interval(100, 300).take(10)
var source3 = Rx.Observable.interval(200).take(10)

var selector = (a,b,c) => [a,b,c]
source1
  .withLatestFrom(source2, source3, selector)
  .subscribe()

vs

// source1 emits first, withLatestFrom discards 1 value from source1
var source1 = Rx.Observable.interval(500).take(7)
var source2 = Rx.Observable.interval(1000, 300).take(10)
var source3 = Rx.Observable.interval(2000).take(10)

var selector = (a,b,c) => [a,b,c]
source1
  .withLatestFrom(source2, source3, selector)
  .subscribe()