1
votes

I'm trying to use retrofit with rxjava. I have a problem chaining retrofit observables with one another or with observables created by me. Some example:

    Observable<List<Friend>> friendsListObservable = friendsService.getFriends();
    Observable<Void> updateReqestObservable = friendsListObservable.switchMap(friends -> {
        Log.d(TAG, "Hello");
        return userAPI.updateFriends(session.getUserId(), friends);
    }).subscribe();

Everything gets called until it gets to switchMap. So hello is never displayed, but if I return for instance Observable.just(null) instead of the retrofit observable it works fine. Also if I user the retrofit observable without chaining, it works.

Edit1: It's an android app. Seems like the map operator is not called at all. Sometimes it happens that with retrofit observables also. I still think that it has something to do with threading. From what I understand an operator is called when an item is emitted, but calling onNext doesn't trigger the map operator. Below is my whole code:

public Observable<List<FacebookFriend>> getFriends() {
    PublishSubject<List<FacebookFriend>> friendsPublishSubject = PublishSubject.create();

    Observable<List<FacebookFriend>> returnObservable = friendsPublishSubject.doOnSubscribe(() -> {
    Log.d(TAG, "OnSubscribe called");
    Session session = Session.getActiveSession();

    if (session != null && session.isOpened()) {
        new Request(session, "/me/friends", null, HttpMethod.GET,
                new Request.Callback() {
                    public void onCompleted(Response response) {
                        JSONObject graphResponse = response.getGraphObject()
                                .getInnerJSONObject();
                        try {
                            JSONArray friends = graphResponse.getJSONArray("data");
                            Gson gson = new Gson();
                            Type listType = new TypeToken<ArrayList<FacebookFriend>>() {
                            }.getType();
                            List<FacebookFriend> friendsList = gson.fromJson(friends.toString(), listType);

                            friendsPublishSubject.onNext(friendsList);
                            friendsPublishSubject.onCompleted();
                        } catch (JSONException e) {
                            e.printStackTrace();
                            friendsPublishSubject.onError(e);
                        }
                    }
                }).executeAsync();
    } else {
        InvalidSessionException exception = new InvalidSessionException("Your facebook session expired");
        friendsPublishSubject.onError(exception);
    }
    });

    return returnObservable.subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}

public Observable<Void> updateFriendsList() {
    Observable<List<FacebookFriend>> facebookFriendsListObservable = facebookService.getFriends();

    Observable<Void> updateReqestObservable = facebookFriendsListObservable.map(friends -> {
    Log.d(TAG, "This is never called");
});

}

1
Have you try with flatMap or concatMap instead of switchMap ? I don't think the problem is the switchMap, but as I don't know well this operator...dwursteisen
I tried, no operator gets called, it remains stuck. onError doesn't get called either. The only way that works is if I subscribe directly to the observable. I think it might have something to do with threading, but I'm not sure. I ended up using observable.toBlocking(). I know this is not good practice but for now we'll do, if someone stumble into the same issue and finds an answer don't hesitate to post it.Jelly
hum, yeah, you're right. In fact, your call is made asynchronously, and your main method will stop before you get the result. So you'll have to block your main method. So toBlocking is a good catch.dwursteisen

1 Answers

1
votes

One way you could get around blocking at the actual calling point would be to subscribe it to a subject and then block on the subject at the end of whatever part of your code requires the requests to have been executed.

For example:

final ReplaySubject<Void> subject = ReplaySubject.create();
friendsService.getFriends()
    .switchMap(friends -> userApi.updateFriends(session.getUserId(), friends))
    .subscribeOn(Schedulers.io())
    .subscribe(subject);

// Do other things that don't require this to be done yet...

// Wherever you need to wait for the request to happen:
subject.toBlocking().singleOrDefault(null);

Since a subject is also an Observable, you could even return the subject from a method, and block on it later.

All that being said, it does seem a bit odd to be using the update call as a side effect there. Is updateFriends also a Retrofit service? If so, you may want to consider making the update call a synchronous service that returns void instead of an Observable<Void> that you will call from within an onNext call. If it still needs to block, you can use forEach like this:

friendsService.getFriends()
    .forEach(friends -> { userApi.updateFriends(session.getUserId(), friends) });

forEach is very similar to subscribe, except that it explicitly blocks. You could even use it with your original code, but added an empty onNext action wouldn't be terribly clean.

If you can also provide more details about your app structure (is this all in a main method? is it an Android app? etc.) I could give some more pointers on avoiding blocking as much as possible.