Old question, but I ran into this same issue. Here's my stab at it. First, the non-working version:
Observable<Integer> emitsMany = Observable.range( 1, 10 )
.concatMap( i -> Observable.just( i ).delay( 1, TimeUnit.SECONDS ))
.doOnNext( i -> System.out.println( "produced " + i ));
Observable<Boolean> emitsOne = Observable.just( true )
.delay( 3, TimeUnit.SECONDS )
.doOnNext( b -> System.out.println( "produced " + b ));
Observable.combineLatest(
emitsMany, emitsOne,
( i, b ) -> "consumed " + i + " " + b )
.blockingSubscribe( System.out::println );
Sure enough, the first couple emissions from emitsMany get dropped:
produced 1
produced 2
produced 3
produced true
consumed 3 true
produced 4
consumed 4 true
. . .
I think here's the fix.. First we need to wrap emitsOne into something that will continue to return the value previously observed without delay. I don't know of an operator that does this, but BehaviorSubject does the job.
Next we can use concatMap with a nested take(1) Observable:
Observable<Integer> emitsMany = Observable.range( 1, 10 )
.concatMap( i -> Observable.just( i ).delay( 1, TimeUnit.SECONDS ))
.doOnNext( i -> System.out.println( "produced " + i ));
Observable<Boolean> emitsOne = Observable.just( true )
.delay( 3, TimeUnit.SECONDS )
.doOnNext( b -> System.out.println( "produced " + b ));
BehaviorSubject<Boolean> emitsOneSubject = BehaviorSubject.create();
emitsOne.subscribe( emitsOneSubject::onNext );
emitsMany.concatMap( i -> emitsOneSubject
.take( 1 )
.map( b -> "consumed " + i + " " + b ))
.blockingSubscribe( System.out::println );
We now get all the combinations:
produced 1
produced 2
produced true
consumed 1 true
consumed 2 true
produced 3
consumed 3 true
produced 4
consumed 4 true
produced 5
consumed 5 true
produced 6
consumed 6 true
produced 7
consumed 7 true
produced 8
consumed 8 true
produced 9
consumed 9 true
produced 10
consumed 10 true
a, with sequenceb? Or do you want to zip them? reactivex.io/documentation/operators/zip.html - flakesstartWithoperator in your Observable 2, so that you will not lose any values. Then you will get like, A1 - Empty, A2- B1.. Then all subsequent emissions will merge with latest one from other observable. - Bharath Mg