2
votes

I am relatively new to RxJava, and I have been playing around with operators for a while.

I saw this small example that emits items after short intervals (1s):

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
        return d + " " + t;
    }).toBlocking().forEach(System.out::println);

This works, but when I remove toBlocking() that turns the source into a BlockingObservable, the program executes and ends with no output.

I usually look at the marble diagrams to understand things properly: http://reactivex.io/documentation/operators/zip.html

In the last sentence it says: It will only emit as many items as the number of items emitted by the source Observable that emits the fewest items.

Does that mean, the data Observable emits all items in less than 1 second and ends before printing the first two items from each Observable? Because each Observable is asynchronous by itself?

I need a clear understanding of what's happening, and if there are other ways to deal with similar cases. Anyone?

2

2 Answers

1
votes

Essentially your main program is exiting before the observable has a chance to emit anything, that is the reason why you are not seeing any output. The way to fix it will be to block in some way until the Observable has emitted all items, here is one way using a CountDownLatch:

CountDownLatch latch = new CountDownLatch(1);
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
    return d + " " + t;
}).finallyDo(latch::countDown).forEach(System.out::println);

latch.await(10, TimeUnit.SECONDS);
1
votes

Observable.interval uses a Scheduler under the covers. It will be emitting from another thread. Meanwhile, the main thread has done all its composing and will exit. Presumably you have this code in a main method, which is why your program is exiting.

In a real system this shouldn't be a problem (unless your real system is a main method with this code in it).

In an example program, you can cause the main thread to block by reading a byte from stdin. Something like this:

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> d + " " + t)
        .subscribe(System.out::println);

System.in.read();