9
votes

Background

I have a number of RxJava Observables (either generated from Jersey clients, or stubs using Observable.just(someObject)). All of them should emit exactly one value. I have a component test that mocks out all the Jersey clients and uses Observable.just(someObject), and I see the same behaviour there as when running the production code.

I have several classes that act upon these observables, perform some calculations (& some side-effects - I might make them direct return values later) and return empty void observables.

At one point, in one such class, I'm trying to zip several of my source observables up and then map them - something like the below:

public Observable<Void> doCalculation() {
    return Observable.zip(
        getObservable1(),
        getObservable2(),
        getObservable3(),
        UnifyingObject::new
    ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}

// in Unifying Object
public Observable<Void> processToNewObservable() {
    // ... do some calculation ...
    return Observable.empty();
}

The calculating classes are then all combined and waited on:

// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
        .toBlocking().lastOrDefault(null);

The problem

The trouble is, processToNewObservable() is never being executed. By process of elimination, I can see it's getObservable1() that's the trouble - if I replace it with Observable.just(null), everything's executed as I'd imagine (but with a null value where I want a real one).

To reiterate, getObservable1() returns an Observable from a Jersey client in production code, but that client is a Mockito mock returning Observable.just(someValue) in my test.

Investigation

If I convert getObservable1() to blocking, then wrap the first value in just(), again, everything executes as I'd imagine (but I don't want to introduce the blocking step):

Observable.zip(
    Observable.just(getObservable1().toBlocking().first()),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

My first thought was that perhaps something else was consuming the value emitted from my observable, and zip was seeing that it was already complete, thus determining that the result of zipping them should be an empty observable. I've tried adding .cache() onto every observable source I can think is related, however, but that hasn't altered the behaviour.

I've also tried adding next / error / complete / finally handlers on getObservable1 (without converting it to blocking) just before the zip, but none of them executed either:

getObservable1()
    .doOnNext(...)
    .doOnCompleted(...)
    .doOnError(...)
    .finallyDo(...);

Observable.zip(
    getObservable1(),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

The question

I'm very new to RxJava, so I'm pretty sure I'm missing something fundamental. The question is: what stupid thing could I be doing? If that's not obvious from what I've said so far, what can I do to help diagnose the issue?

3
Is any of the getObservable going asynchronous? It is possible your test completes before the zip can even collect all data. You can try and add toBlocking after the concatMap to see if that works or not. - akarnokd
Nothing is asynchronous in the test - it's all just Observable.just(). Regardless, something collects the zipped & mapped observable and calls toBlocking on it eventually. Just to be sure, I've tried adding toBlocking.first() after concatMap and wrapping that with Observable.just() (in order to get an Observable, not BlockingObservable, to satisfy an interface); trying that throws a NoSuchElementException: Sequence contains no elements - Rowan

3 Answers

1
votes

The Observable must emit to start the chain. You have to think of your pipeline as a declaration of what will happen when the Observable emits.

You didn't share what was actually being observed, but Observable.just() causes the Observable to emit the wrapped object immediately.

0
votes

Based on the response in the comment, either one of the getObservable doesn't return any value but just completes or the Mockito mocking does something wrong. The following standalone example works for me. Could you check it and start slowly mutating it to see where things break?

Observable.zip(
        Observable.just(1),
        Observable.just(2),
        Observable.just(3),
        (a, b, c) -> new Integer[] { a, b, c })
 .concatMap(a -> Observable.from(a))
 .subscribe(System.out::println)
 ;
0
votes

Note: I didn't find my answer here very satisfying, so I dug in a bit further and found a much smaller reproduction case, so I've asked a new question here: Why does my RxJava Observable emit only to the first consumer?


I've figured out at least part of my troubles (and, apologies to all who tried to answer, I don't think you stood much of a chance given my explanation).

The various classes which perform these calculations were all returning Observable.empty() (as per processToNewObservable() in my original example). As far as I can tell, Observable.zip() doesn't subscribe to the Nth observable it's zipping until the N-1th observable has emitted a value.

My original example claimed it was getObservable1() that was misbehaving - that was actually slight inaccurate, it was a later observable in the parameter list. As I understand it, the reason making it blocking, then turning that value into an Observable again worked is because making it blocking and calling first forced its execution, and I got the side-effects I wanted.

If I change all my calculating classing to return Observable.just(null) instead, everything works: the final zip() of all the calculation classes' observables works through them all, so all the expected side-effects happen.

Returning a null Void seems like I'm definitely Doing Something Wrong from a design point of view, but at least this particular question is answered.