2
votes

I want to combine two observables, one emits n items and the other one only 1.

combineLatest will wait until both observables have at least emitted one item and then combines the latest emitted items until both observable have finished. Consider following chronologically order:

  • Observable A -> emits result A1
  • Observable A -> emits result A2
  • Observable B -> emits result B1

combineLatest will only combine result 2 of observable 1 with result 1 of observable 2 (can be tester here easily: http://rxmarbles.com/#combineLatest).

What I need

I need to combine ALL items of two observables, no matter which one is faster. How can I do that?

Result should be (always, independent of which observable starts emitting items first!):

  • A1 combined with B1
  • A2 combined with B1
2
Do you mean that you want all permutations of some sequence a, with sequence b? Or do you want to zip them? reactivex.io/documentation/operators/zip.html - flakes
all permutations... One observable only emits 1 item, the other n items and I want to get n combinations (all n items, each one combined with the 1 item) - prom85
I adjusted my example to make that more clear... - prom85
May be try chaining startWith operator in your Observable 2, so that you will not lose any values. Then you will get like, A1 - Empty, A2- B1.. Then all subsequent emissions will merge with latest one from other observable. - Bharath Mg

2 Answers

1
votes

Old question, but I ran into this same issue. Here's my stab at it. First, the non-working version:

    Observable<Integer> emitsMany = Observable.range( 1, 10 )
            .concatMap( i -> Observable.just( i ).delay( 1, TimeUnit.SECONDS ))
            .doOnNext( i -> System.out.println( "produced " + i ));

    Observable<Boolean> emitsOne = Observable.just( true )
            .delay( 3, TimeUnit.SECONDS )
            .doOnNext( b -> System.out.println( "produced " + b ));

    Observable.combineLatest(
            emitsMany, emitsOne,
            ( i, b ) -> "consumed " + i + " " + b )
    .blockingSubscribe( System.out::println );

Sure enough, the first couple emissions from emitsMany get dropped:

produced 1
produced 2
produced 3
produced true
consumed 3 true
produced 4
consumed 4 true
. . .

I think here's the fix.. First we need to wrap emitsOne into something that will continue to return the value previously observed without delay. I don't know of an operator that does this, but BehaviorSubject does the job.

Next we can use concatMap with a nested take(1) Observable:

    Observable<Integer> emitsMany = Observable.range( 1, 10 )
            .concatMap( i -> Observable.just( i ).delay( 1, TimeUnit.SECONDS ))
            .doOnNext( i -> System.out.println( "produced " + i ));

    Observable<Boolean> emitsOne = Observable.just( true )
            .delay( 3, TimeUnit.SECONDS )
            .doOnNext( b -> System.out.println( "produced " + b ));

    BehaviorSubject<Boolean> emitsOneSubject = BehaviorSubject.create();
    emitsOne.subscribe( emitsOneSubject::onNext );

    emitsMany.concatMap( i -> emitsOneSubject
            .take( 1 )
            .map( b -> "consumed " + i + " " + b ))
    .blockingSubscribe( System.out::println );

We now get all the combinations:

produced 1
produced 2
produced true
consumed 1 true
consumed 2 true
produced 3
consumed 3 true
produced 4
consumed 4 true
produced 5
consumed 5 true
produced 6
consumed 6 true
produced 7
consumed 7 true
produced 8
consumed 8 true
produced 9
consumed 9 true
produced 10
consumed 10 true
0
votes

Note this is untested:

Create a ReplaySubject from observable sequence a. For each value that emits on sequence b combine that value with the replay subject to create a new observable of Pair<A, B>. Flat map these observables together and return that as the result.

public static <A, B> Observable<Pair<A, B>> permutation(
    Observable<A> observableA, 
    Observable<B> observableB, 
) {
    ReplaySubject<A> subjectA = ReplaySubject.create();
    observableA.subscribe(subjectA::onNext);
    return observableB.flatMap(b -> subjectA.map(a -> Pair.of(a, b)));
}