1
votes

I'm new in RxJava framework. My system use token authentication, when I want to get any response from http request I need to refresh my auth token because of expiration time.

But I got a problem after invoking subscribe method from one observable to another observable

Main class (First observable)

service.getPosts(offset,limit, PreferencesUtils.getAccessTokenKey())
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(response -> {
            if (!response.isSuccessful()) throw new RuntimeException();
        })
        .subscribe(
            feeds -> mView.showNewsFeedItems(feeds.body()),
            error -> {
                Helper.refreshToken();
                service.getPosts(offset,limit,PreferencesUtils.getAccessTokenKey()); 
            }
        );

My response is not successful and I need to invoke refreshToken() where I can refresh token and save it to PreferencesUtils

Helper class (Second observable)

service.refreshToken(PreferencesUtils.getAccessTokenKey(), PreferencesUtils.getRefreshTokenKey())
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.newThread())
        .subscribe(


// doesnt proceed this part of code
                response -> {
                    PreferencesUtils.saveAccessToken(response.body().getAccessToken());
                    PreferencesUtils.saveRefreshToken(response.body().getRefreshToken());
                },

                error -> Log.d(TAG, "refresh token exception: " + `error.getMessage())`
//

            );

When I using debugger, Second observable from Helper class doesn't work properly it doesn't want to proceed this service and has finished immediately on subscribe without any error (definitely it's waiting for finishing all operations from main thread). Because of the subscribe method didn't proceed (wasn't save access token to preferences) I can't execute my service.getPosts() function.

Help please! I will be very gratefull!

1

1 Answers

1
votes

Rather than splitting up the observer chain like that, use the retryWhen() operator to respond to a failed token.

Observable.just(Boolean.TRUE)
    .flatMap( ignored -> service
                           .getPosts(offset,limit,
                                      PreferencesUtils.getAccessTokenKey()) )
    .subscribeOn(Schedulers.computation())
    .doOnNext(response -> {
        if (!response.isSuccessful()) throw new TokenException();
    })
    .retryWhen( error -> error.flatMap( f ->
      {
        if ( f instanceof TokenException ) {
          return service.refreshToken(PreferencesUtils.getAccessTokenKey(), PreferencesUtils.getRefreshTokenKey())
            .doOnNext( response -> {
                PreferencesUtils.saveAccessToken(response.body().getAccessToken());
                PreferencesUtils.saveRefreshToken(response.body().getRefreshToken());
            });
        }
        return Observable.error(f);
      })
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe( /* do stuff */ );

The retryWhen() operator takes an observable that will get whatever exception was thrown upstream. If the exception is a TokenException, it will attempt to refresh the token by invoking the observable returned by refreshToken() and then saving the result to your preferences.

Edit: Start the observer chain with an observable that can be re-subscribed to without issue. When the access token has to be refreshed, getPosts() will be invoked with the new token.