1
votes

I'm new to RxJava 2 and want to retry a Completable server API call until success, while emitting notifications of the retry attempts so that my UI can display the retry status to the user.

Something like this:

public Observable<RetryAttempt> retryServerCall() {

    // execute Completable serverCall()

    // if an error is thrown, emit new RetryAttempt(++retryCount, error) to subscriber

    // retry until serverCall() is successful
}

public Completable serverCall();

public class RetryAttempt {
    public RetryAttempt(int retryCount, Throwable cause);
}

I've tried a few different approaches and ran into roadblocks. The closest is this approach, creating an enclosing Observable and explicitly calling onNext() / onComplete() / onError().

public Observable<RetryAttempt> retryServerCall() {
    final int[] retryCount = {0};
    return Observable.create(e ->
        serverCall()
                .doOnError(throwable -> e.onNext(new RequestHelp.RetryAttempt(++retryCount[0], throwable)))
                .retry()
                .subscribe(() -> e.onComplete(), throwable -> e.onError(throwable)));
}

Perhaps its a somewhat a peripheral matter, but I had to use the final array for retryCount in order to avoid the error variable used in lambda should be final or effectively final.

I know there must be a better to accomplish this using Rx voodoo. Any guidance is much appreciated!

1
You don't want to do it exactly like this, as you will lose unsubscription signalling. - Tassos Bassoukos
@Tassos True. I can use subscribeWith to get a Disposable for the inner Observable, and then dispose via setDisposable, correct? - HolySamosa

1 Answers

1
votes
public Single<List<Farmer>> getAllFarmers(long timestamp) {

    return  Observable.fromCallable(() -> mapiFactory.getAllFarmerAboveTime(timestamp))
            .doOnError(throwable -> Log.d(TAG, "Error calling getAllFarmers: "+throwable.getMessage()))
            .retryWhen(new RetryWithDelay(5,1000))
            .concatMap(farmersResponse -> Observable.fromIterable(farmersResponse.farmer))
            .filter(farmer -> !StringUtils.isBlank(farmer.cnic))
            .map(this::validateCnic)
            .distinct(farmer -> farmer.cnic)
            .toList();

}

when fromCallable() method throw exception .retryWhen(new RetryWithDelay(5,1000)) will get execute here we retry the api for 5 times at exponential delay starting from 1000

and here is RetryWithDelay

public class RetryWithDelay implements Function<Observable<Throwable>,
  Observable<?>> {

private final int _maxRetries;
private final int _retryDelayMillis;
private int _retryCount;

public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
    _maxRetries = maxRetries;
    _retryDelayMillis = retryDelayMillis;
    _retryCount = 0;
}


@Override
public Observable<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {

    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

              if (++_retryCount < _maxRetries) {

                // When this Observable calls onNext, the original
                // Observable will be retried (i.e. re-subscribed)

                Log.d(TAG, String.format("Retrying in %d ms", _retryCount * _retryDelayMillis));

                return Observable.timer(_retryCount * _retryDelayMillis, TimeUnit.MILLISECONDS);
            }

            // Max retries hit. Pass an error so the chain is forcibly completed
            // only onNext triggers a re-subscription (onError + onComplete kills it)
            return Observable.error(throwable);
        }

    });
}

}