9
votes

I'm new to RxJava, here's my case,

  1. send request A and will get List<A> back
  2. for each A, send request AA and will get AA back, bind A and AA then
  3. there is B & BB with similar logic
  4. do something only after all requests complete

Example:

request(url1, callback(List<A> listA) {
    for (A a : listA) {
        request(url2, callback(AA aa) {
            a.set(aa);
        }
    }
}

A and B are independent

How to structure the code? I also used Retrofit as network client.

1
Handlers and a progress integer to control if you reach the end of the logic or not. - Pedro Oliveira
@Oliveira, yes, handler is one solution, but I want to know if RxJava could offer a better and simple solution - fifth
What exactly do you mean by "bind A and AA"? So you have one request that yields a List of As, and then for each A there will be exactly one AA? Also, do you still need the As when you have the AAs? What are you going to do with the AAs - is there one action for each or a combined action for all AAs together? - david.mihola
@david.mihola, there is one AA for every A, 1-1 map. I can't get both A & AA in one request because of limit of server. The "bind A and AA" is the action after AA retrieved, sth like A.set(AA) - fifth

1 Answers

14
votes

OK, I think this should solve the first part of your problem:

Notice that the second call to flatMap is given 2 arguments - there is a version of flatMap that not only produces an Observable for each input item but that also take a second function which in turn will combine each item from the resulting Observable with the corresponding input item.

Have a look at the third graphic under this heading to get an intuitive understanding:

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap-concatmap-and-flatmapiterable

Observable<A> obeservableOfAs = retrofitClient.getListOfAs()
.flatMap(new Func1<List<A>, Observable<A>>() {

    @Override
    public Observable<A> call(List<A> listOfAs) {
        return Observable.from(listOfAs);
    }

)}
.flatMap(new Func1<A, Observable<AA>>() {

    @Override
    public Observable<AA> call(A someA) {
        return retrofitClient.getTheAaForMyA(someA);
    }

},
new Func2<A, AA, A>() {

    @Override
    public A call(A someA, AA theAaforMyA) {
        return someA.set(theAaforMyA);
    }

})
...

From here on I am still not sure how you want to continue: Are you ready to just subscribe to the resulting Observable of As? That way you could handle each of the As (onNext) or just wait until all are done (onCompleted).

ADDENDUM: To collect all Items into a single List at the end, that is turn your Observable<A> into an Observable<List<A>> use toList().

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#tolist

So you have:

Observable<List<A>> observableOfListOfAs = observableOfAs.toList();

If you need more fine grained control over the construction of your list, you can also use reduce.

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce

For the Bs, simply duplicate the whole flow you used for the As.

You can then use zip to wait for both flows to complete:

Observable.zip(
    observableOfListOfAs,
    observableOfListOfBs,
    new Func2<List<A>, List<B>, MyPairOfLists>() {

        @Override
        public MyPairOfLists call(List<A> as, List<B> bs) {
            return new MyPairOfLists(as, bs);
        }
    }
)
.subscribe(new Subscriber<MyPairOfLists>() {

    // onError() and onCompleted() are omitted here

    @Override
    public void onNext(MyPairOfLists pair) {
        // now both the as and the bs are ready to use:

        List<A> as = pair.getAs();
        List<B> bs = pair.getBs();

        // do something here!
    }
});

I suppose you can guess the definition of MyPairOfLists.