1
votes

I'm pretty new to RxJava, and whenever I have a case where I need to pass return data from one observable down the chain until a call to 'subscribe' - I have trouble understanding how to do it the 'Reactive' way without any patches...

For example:

Observable<GameObject> obs1 = func1();
Observable<GameObject> obs2 = func2();
Observable<GameObject> obs3 = func3();
Observable<GameObject> obs3 = func4();

I would like to emit obs1 and obs2, get their result, then emit obs3 then obs4 and then end the chain with subscribe while having the access to the results of obs1,obs2,obs3 and obs4.

The order of the calls is important, I need obs1 and obs2 to complete before obs3 is executed.

same goes for obs3 and obs4 - I need obs3 to complete before obs4 is executed.

How can I do that?

I know it's a pretty digested question - but this is one of the most problematic issues when a developer starts to know rxJava.

Thanks.

2

2 Answers

4
votes

You can do it using Observable.zip and simple Observable.map/Observable.flatMap:

Observable.zip(obs1, obs2, (res1, res2) -> {
    // do stuff with res1, res2
    return obs3.flatMap(res3 -> {
        // do stuff with res1, res2, res3
        return obs4.flatMap(res4 -> {
            // do stuff with res1, res2, res3, res4
            return result;
        });
    });
});

This will force your scheduling requirements:

  • observables 1 and 2

  • observable 3

  • observable 4

1
votes

Since I had the same kind of doubts in mind a while ago, the question seams to be related to how Observables really work.

Let's say you created obs1 and obs2 using something like: Observable<GameObject> obs1 = Observable.create(...) Observable<GameObject> obs2 = Observable.create(...)

You have 2 independent and disconnected streams. That's what you want when each of them are supposed to do something like a network request or some intensive background processing, which can take some time.

Now, let's say you want to watch for both results and emit a single value out of them when they get ready (you didn't say explicitly that, but it's gonna help you understand how it works). In this case, you can use the zipWith operator, which takes a pair of items, the first item from the first Observable and the second item from the second Observable, combine them into a single item, and emit it to the next one in the chain that may be interested on it. zipWith is called from an Observable, and expects another Observable as param to be zipped with. It also expects a custom function that knows how to zip the 2 source items and create a new one out of them.

Observable<CustomObject> obs3 = obs1.zipWith(obs2, new Func2<GameObject, GameObject, CustomObject>() {
    @Override 
    public CustomObject call(GameObject firstItem, GameObject secondItem) {
        return new CustomObject(firstItem, secondItem);
    }
});

In this case, the CustomObject is just a pojo. But it could be another long running task, or whatever you need to do with the results from the first two Observable items.

If you want to wait for (or, to observe!) the results coming from obs3 you can plug another Observable at the end, which is supposed to perform another piece of processing.

Observable<FinalResult> obs4 = obs3.map(new Func1<CustomObject, FinalResult>() {
    @Override
    public FinalResult call(CustomObject customObject) {
         return new FinalResult(customObject);
    }
});

The map operator transforms (or maps) one object into another. So you could perform another piece of processing, or any data manipulation, and return a result out of it. Or your FinalResult might be a regular class, like CustomObject, just holding references to the other GameObjects.. you name it.

Depending how you created your Observables, they may not have started to emit any items yet. Until now you were just creating and plugging the data pipes. In order to trigger the first task and make items flow in the stream you need to subscribe to it.

obs4.subscribe();

Wrapping up, you don't really have one single variable passing along the whole chain. You actually create an item in the first Observable, which notifies the second one when it gets ready, and so on. Additionally, each step (observable) transforms the data somehow. So, you have a chain of transformations.

RxJava follows a functional approach, applying high order functions (map, zip, filter, reduce) to your data. It's crucial to have this clear. Also, the data is always immutable: you don't really change an Observable, or change your own objects. It creates new instances of them, and the old objects will eventually be garbage collected. So obs1.zip(...) doesn't change obs1, it creates a new Observable instance, and you can assign it to a variable.

You can also drop the variable assignments (obs1, obs2, obs3 etc) and just chain all methods together. Everything is strongly typed, so the compiler will not let you plug Observables that don't match each other (output of one should match input of the next).

I hope it gives you some thoughts!