11
votes

I'm trying to wrap my head around RxJava currently, but I'm having a little trouble with handling service call exceptions in an elegant manner.

Basically, I have a (Retrofit) service that returns an Observable<ServiceResponse>. ServiceResponse is defined like so:

public class ServiceResponse {
    private int status;
    private String message;
    private JsonElement data;

    public JsonElement getData() {
        return data;
    }

    public int getStatus() {
        return status;
    }

    public String getMessage() {
        return message;
    }
}

Now what I want is to map that generic response to a List<Account> contained within the data JsonElement field (I assume you don't care what the Account object looks like, so I won't pollute the post with it). The following code works really well for the success case, but I can't find a nice way to handle my API exceptions:

service.getAccounts()
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .map(new Func1<ServiceResponse, AccountData>() {
               @Override
               public AccountData call(ServiceResponse serviceResponse) {

                   // TODO: ick. fix this. there must be a better way...
                   ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
                   switch (responseType) {
                       case SUCCESS:
                           Gson gson = new GsonBuilder().create();
                           return gson.fromJson(serviceResponse.getData(), AccountData.class);
                       case HOST_UNAVAILABLE:
                           throw new HostUnavailableException(serviceResponse.getMessage());
                       case SUSPENDED_USER:
                           throw new SuspendedUserException(serviceResponse.getMessage());
                       case SYSTEM_ERROR:
                       case UNKNOWN:
                       default:
                           throw new SystemErrorException(serviceResponse.getMessage());
                   }
              }
        })
        .map(new Func1<AccountData, List<Account>>() {
                @Override
                public List<Account> call(AccountData accountData) {
                    Gson gson = new GsonBuilder().create();
                    List<Account> res = new ArrayList<Account>();
                    for (JsonElement account : accountData.getAccounts()) {
                        res.add(gson.fromJson(account, Account.class));
                    }
                    return res;
                }
        })
        .subscribe(accountsRequest);

Is there a better way to do this? This does work, onError will fire to my observer, and I will receive the error that I threw, but it definitely does not seem like I'm doing this right.

Thanks in advance!

Edit:

Let me clarify exactly what I want to achieve:

I want to have a class that can be called from the UI (e.g. an Activity, or Fragment, or whatever). That class would take an Observer<List<Account>> as a parameter like so:

public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) {
    ...
}

That method would return a subscription that can be unsubscribed when the UI is detached/destroyed/etc.

The parameterized observer would handle onNext for the successful responses passing in a list of Accounts. OnError would handle any exceptions, but would also get passed any API exceptions (e.g. if the response status != 200 we would create a Throwable and pass it to onError). Ideally I don't want to just "throw" the Exception, I want to pass it directly to the Observer. That's what all the examples I see do.

The complication is that my Retrofit service returns a ServiceResponse object, so my observer cannot subscribe to that. The best I've come up with is to create an Observer wrapper around my Observer, like so:

@Singleton
public class AccountsDatabase {

    private AccountsService service;

    private List<Account> accountsCache = null;
    private PublishSubject<ServiceResponse> accountsRequest = null;

    @Inject
    public AccountsDatabase(AccountsService service) {
        this.service = service;
    }

    public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) {

        ObserverWrapper observerWrapper = new ObserverWrapper(observer);

        if (accountsCache != null) {
            // We have a cached value. Emit it immediately.
            observer.onNext(accountsCache);
        }

        if (accountsRequest != null) {
            // There's an in-flight network request for this section already. Join it.
            return accountsRequest.subscribe(observerWrapper);
        }

        if (accountsCache != null && !forceRefresh) {
            // We had a cached value and don't want to force a refresh on the data. Just
            // return an empty subscription
            observer.onCompleted();
            return Subscriptions.empty();
        }

        accountsRequest = PublishSubject.create();

        accountsRequest.subscribe(new ObserverWrapper(new EndObserver<List<Account>>() {

            @Override
            public void onNext(List<Account> accounts) {
                accountsCache = accounts;
            }

            @Override
            public void onEnd() {
                accountsRequest = null;
            }
        }));

        Subscription subscription = accountsRequest.subscribe(observerWrapper);

        service.getAccounts()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(accountsRequest);

        return subscription;
    }

    static class ObserverWrapper implements Observer<ServiceResponse> {

        private Observer<List<Account>> observer;

        public ObserverWrapper(Observer<List<Account>> observer) {
            this.observer = observer;
        }

        @Override
        public void onCompleted() {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(ServiceResponse serviceResponse) {
            ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
            switch (responseType) {
                case SUCCESS:
                    Gson gson = new GsonBuilder().create();
                    AccountData accountData = gson.fromJson(serviceResponse.getData(), AccountData.class);
                    List<Account> res = new ArrayList<>();
                    for (JsonElement account : accountData.getAccounts()) {
                        res.add(gson.fromJson(account, Account.class));
                    }
                    observer.onNext(res);
                    observer.onCompleted();
                    break;
                default:
                    observer.onError(new ApiException(serviceResponse.getMessage(), responseType));
                    break;
            }
        }
    }
}

I still feel like I am not using this correctly though. I definitely haven't seen anyone else using an ObserverWrapper before. Perhaps I shouldn't be using RxJava, though the guys at SoundCloud and Netflix really sold me on it in their presentations and I'm pretty eager to learn it.

2

2 Answers

13
votes

Please read below I've added an edit.

It's perfectly correct to throw within an Action/Func/Observer with RxJava. The exception will be propagate by the framework right down to your Observer. If you limit yourself to calling onError only then you'll be twisting yourself to make that happen.

With that being said a suggestion would be to simply remove this wrapper and add a simple validation Action within the service.getAccount... chain of Observables.

I'd use the doOnNext(new ValidateServiceResponseOrThrow) chained with a map(new MapValidResponseToAccountList). Those are simple classes which implements the necessary code to keep the Observable chain a bit more readable.

Here's your loadAccount method simplified using what I suggested.

public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) {
    if (accountsCache != null) {
        // We have a cached value. Emit it immediately.
        observer.onNext(accountsCache);
    }

    if (accountsRequest != null) {
        // There's an in-flight network request for this section already. Join it.
        return accountsRequest.subscribe(observer);
    }

    if (accountsCache != null && !forceRefresh) {
        // We had a cached value and don't want to force a refresh on the data. Just
        // return an empty subscription
        observer.onCompleted();
        return Subscriptions.empty();
    }

    accountsRequest = PublishSubject.create();
    accountsRequest.subscribe(new EndObserver<List<Account>>() {

        @Override
        public void onNext(List<Account> accounts) {
            accountsCache = accounts;
        }

        @Override
        public void onEnd() {
            accountsRequest = null;
        }
    });

    Subscription subscription = accountsRequest.subscribe(observer);

    service.getAccounts()
            .doOnNext(new ValidateServiceResponseOrThrow())
            .map(new MapValidResponseToAccountList())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(accountsRequest);

    return subscription;
}

private static class ValidateResponseOrThrow implements Action1<ServiceResponse> {
        @Override
        public void call(ServiceResponse response) {
            ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
            if (responseType != SUCCESS)
                throw new ApiException(serviceResponse.getMessage(), responseType));
        }
    }

private static class MapValidResponseToAccountList implements Func1<ServiceResponse, List<Account>> {
    @Override
    public Message call(ServiceResponse response) {
        // add code here to map the ServiceResponse into the List<Accounts> as you've provided already
    }
}

Edit: Unless someone says otherwise I think it's best practice to return errors using flatMap. I've thrown Exceptions from Action in the past but I don't believe it's the recommended way.

You'll have a cleaner Exception stack if you use flatMap. If you throw from inside an Action the Exception stack will actually contain rx.exceptions.OnErrorThrowable$OnNextValue Exception which isn't ideal.

Let me demonstrate the example above using the flatMap instead.

private static class ValidateServiceResponse implements rx.functions.Func1<ServiceResponse, Observable<ServiceResponse>> {
    @Override
    public Observable<ServiceResponse> call(ServiceResponse response) {
        ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus());
        if (responseType != SUCCESS)
            return Observable.error(new ApiException(serviceResponse.getMessage(), responseType));
        return Observable.just(response);
    }
}

service.getAccounts()
    .flatMap(new ValidateServiceResponse())
    .map(new MapValidResponseToAccountList())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(accountsRequest);

As you can see the the difference is subtle. The ValidateServiceResponse now implements the Func1 instead of Action1 and we're no longer using the throw keyword. We use Observable.error(new Throwable) instead. I believe this fits better with the expected Rx contract.

0
votes