3
votes

I have an API that takes an Observable that triggers an event.

I want to return an Observable that emits a value every defaultDelay seconds if an Internet connection is detected, and delays numberOfFailedAttempts^2 times if there's no connection.

I've tried a bunch of various styles, the biggest problem I'm having is retryWhen's observable is only evaluated once:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

Is there any way to do what I'm attempting to do? I found a related question (can't find it searching right now), but the approach taken didn't seem to work with a dynamic value.

3

3 Answers

5
votes

In your code there are two mistakes:

  1. In order to repeat some observable sequence, that sequence has to be finite. I.e. instead of interval you'd better use something like just, or fromCallable as I did in sample below.
  2. From repeatWhen's inner function you need to return new delayed observable source, so instead of observable.delay() you have to return Observable.timer().

Working code:

public void testRepeat() throws InterruptedException {
    logger.info("test start");

    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                    logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}", v));

    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}

See detailed article about repeatWhen here.

2
votes

I've always found retryWhen to be somewhat low-level so for exponential backoff I use a a builder (like Abhijit) that is unit tested and available for RxJava 1.x at rxjava-extras. I'd suggest using a capped version so that the exponential increase of delay won't go beyond a maximum value you define.

This is how you use it:

observable.retryWhen(
    RetryWhen.exponentialBackoff(
        delay, maxDelay, TimeUNIT.SECONDS)
    .build());

I disagree that retryWhen is buggy but if you find a bug report it to RxJava. Bugs are fixed fast!

You'll need rxjava-extras 0.8.0.6 or later which is on Maven Central:

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.8.0.6</version>
</dependency>

Let me know if you need the RxJava 2.x version. The same functionality is available in rxjava2-extras from 0.1.4.

1
votes

You can use the retryWhen operator to configure the delay when there's no connection. How to periodically emit items is a separate topic (look up interval or timer operators). Open a separate question if you can't figure it out.

I've an extensive example on my Github, but I'll give you the gist here.

RetryWithDelay retryWithDelay = RetryWithDelay.builder()
    .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
    .build()

Single.fromCallable(() -> {
    ...
}).retryWhen(retryWithDelay)
.subscribe(j -> {
    ...
})

RetryWithDelay is defined as follows. I used RxJava 2.x, so if you're using 1.x, the signature should be Func1<Observable<? extends Throwable>, Observable<Object>>.

public class RetryWithDelay implements
        Function<Flowable<? extends Throwable>, Publisher<Object>> {
    ...
}

RetryWithDelay class.

RetryStrategy enum.

This allows me to configure various sorts of timeouts, constant, linear, exponential, based on the RetryDelayStrategy. For your use case, you'd choose CONSTANT_DELAY_TIMES_RETRY_COUNT delay strategy and call retryDelaySeconds(2) when building RetryWithDelay.

retryWhen is a complicated, perhaps even buggy, operator. Most examples you'll find online use the range operator, which will fail if there are no retries to be made. See my answer here for details.