3
votes

How do I transform observable's error to another observable?

I am trying to implement algorithm showed on scheme below:

Block scheme

I am using a tutorial named Grokking RxJava to start my learning and found that flatMap operator can convert one Observable returning by Retrofit into another which allows me to do a chain of invocations of server methods. But in the tutorial chaining methods always return success result. How I can do the same chaining if one of the remote methods returns error?

Currently I'm using an approach that looks odd and unclear to me:

AppObservable.bindActivity(this, userService.checklogin(mPhone)
            .onErrorResumeNext(new Func1<Throwable, Observable<? extends Response>>() {
                @Override
                public Observable<? extends Response> call(Throwable throwable) {
                    return Observable.just(null);
                }
            }))
            .flatMap(new Func1<Response, Observable<Response>>() {
                @Override
                public Observable<Response> call(Response response) {
                    if (response == null) {
                        return AppObservable.bindActivity(RegistrationActivity.this, userService.register(
                                mPhone,
                                name.getText().toString(),
                                selectedSex,
                                selectedDateDay,
                                selectedDateMonth,
                                selectedDateYear,
                                Locale.getDefault().getLanguage(),
                                persistentUserInfoStore.getInstallationToken()
                        ));
                    }
                    phone.setError(getString(R.string.already_registered_phone));
                    progressDialog.dismiss();
                    return Observable.empty();
                }
            })
            .subscribe(new Action1<Response>() {
                @Override
                public void call(Response response) {
                    startConfirmationActivity();
                    progressDialog.dismiss();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    progressDialog.dismiss();
                    Toast.makeText(RegistrationActivity.this, "Error while register user", Toast.LENGTH_SHORT).show();
                }
            });

Is there any more clear approach to do this.

P.S. I am very new in RxJava.

1
Why don't you call userService.register from onErrorResumeNext function?Vladimir Mironov
@VladimirMironov I am not sure what will happen after. If I understand correctly, after passing userService.register in onErrorResumeNext in case of error in observable userService.checkLogin its subscriber will be resubscribed to userService.register and will consume its emissions. How can I separate successful emissions from userService.register and from userService.checkLogin?Vladimir Tagakov
How about something like pastebin.com/Cu32c8tc?Vladimir Mironov
@VladimirMironov it looks nice and clear. I think you should post that solution as answer, Ill accept it.Vladimir Tagakov
Take a look to retrolambda github.com/evant/gradle-retrolambda, could simplify the codeDaniel Gomez Rico

1 Answers

5
votes
  1. You should call a userService.register from onErrorResumeNext
  2. The easiest way to distinguish what kind of error happened is to introduce a separate Exception class for each error.

This is how it looks in code:

userService.checkLogin(mPhone).flatMap(new Func1<Response, Observable<Response>() {
    @Override
    public Observable<? extends Response> call(final Response response) {
        // according to your diagram the whole Observable should fail if first call was successful
        return Observable.error(new FirstCallWasSuccessfulException(response));
    }
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Response>>() {
    @Override
    public Observable<? extends Response> call(final Throwable throwable) {
        if (throwable instanceof FirstCallWasSuccessfulException) {
            return Observable.error(throwable);
        }

        return userService.register(...);
    }
}).subscribe(new Subscriber<Response>() {
    @Override
    public void onError(final Throwable throwable) {
        if (throwable instanceof FirstCallWasSuccessfulException) {
            // foo
        } else {
            // bar
        }
    }

    @Override
    public void onNext(final Response response) {

    }

    @Override
    public void onCompleted() {

    }
});