3
votes

I have a SyncService.

@Override
public int onStartCommand(Intent intent, int flags, final int startId) {
    Timber.i("Starting sync...");
    ...
    RxUtil.unsubscribe(mSubscription);
    mSubscription = mDataManager.syncEvents()
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<Event>() {
                @Override
                public void onCompleted() {
                    Timber.i("Synced successfully!");
                    stopSelf(startId);
                }

                @Override
                public void onError(Throwable e) {
                    Timber.w(e, "Error syncing.");
                    stopSelf(startId);

                }

                @Override
                public void onNext(Event event) {
                }
            });

    return START_STICKY;
}

Observable events = mDataManager.syncEvents() is an API call.

I want to do a parallel call:

Single userInfo = mDataManager.getUserInfo()

and call stopSelf(startId); after these two calls will finish.

How can I do it?

I tried RxJava Fetching Observables In Parallel but this is a little different case.

I think I have to use .zip or .merge method. But in my case one method call returns Observable (list on Events) and second Single (one UserInfo object).

I created z result class which could be a result of .zip method, but I don't know how to fill it:

public class SyncResponse {
     List<Event> events;
     UserInfo userInfo;
     ...
}
2

2 Answers

2
votes

Since you have just two observables you can combine them using android Pair to combine results.

mDataManager.syncEvents()
    .zipWith(mDataManager.getUserInfo().toObservable(), Pair::create)
    .subscribeOn(Schedulers.io())
    .subscribe(pair -> {
        Event event = pair.first;
        UserInfo userInfo = pair.second;

        // your code here

        Timber.i("Synced successfully!");
        stopSelf(startId);
    }, throwable -> {
        Timber.w(e, "Error syncing.");
        stopSelf(startId);
    });

You can do the same without java8/retrolmbda but why

If you need collect all events till completed and combine them all with single user info it would be little bit complicated

Observable<ArrayList<String>> eventsObservable = mDataManager.syncEvents()
    .collect(ArrayList::new, ArrayList::add); 

mDataManager.getUserInfo()
    .zipWith(eventsObservable.toSingle(), Pair::create)
    .subscribeOn(Schedulers.io())
    .subscribe(pair -> {
        UserInfo userInfo = pair.first;
        List<Event> event = pair.second;

        // your code here

        Timber.i("Synced successfully!");
        stopSelf(startId);
    }, throwable -> {
        Timber.w(e, "Error syncing.");
        stopSelf(startId);
    });
0
votes

You can use observable.zip and single.toObservable to accomplish this. I believe you will need to also do your thread scheduling within the zip call on each observable separately to do them in parallel.

Observable.zip(call1.subscribeOn(Schedulers.io()), call2.toObservable().subscribeOn(Schedulers.io(), zip-function)

Since both calls are different wrapped types.

The zip-function is where you would use your class to combine your results.

It should be noted that since one of your observables is a single, this method with zip will only ever produce one result at most, as the single will send an onComplete event right after its first onNext event.

If you know your list of events will complete, you can use toList on call1 to buffer them and emit them as a single, collected list of events.