16
votes

we can use the cache() operator to avoid executing a long task (http request) multiple times, and reuse its result:

Observable apiCall = createApiCallObservable().cache(); // notice the .cache()

---------------------------------------------
// the first time we need it
apiCall.andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again
apiCall.andSomeDifferentStuff()
               .subscribe(subscriberB);

The first time, the http request is executed, but the second time, since we used the cache() operator, the request won't be executed but we'll be able to reuse the first result.

This works fine when the first request completes successfully. But if onError is called in the first attempt, then the next time that a new subscriber subscribes to the same observable, the onError will be called again without attempting the http request again.

What we are trying to do is, that if onError is called the first time, then the next time that someone subscribes to the same observable, the http request will be attempted from scratch. ie the observable will cache only the successful api calls, ie those for which the onCompleted was called.

Any ideas about how to proceed? We've tried using the retry() and cache() operators with no much luck.

5

5 Answers

8
votes

Well, for anyone still interested, I think I have a nicer way to achieve it with rx.

The key note is to use onErrorResumeNext, which will let you replace the Observable in case of error. so it should look something like this:

Observable<Object> apiCall = createApiCallObservable().cache(1);
//future call
apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
    public Observable<? extends Object> call(Throwable throwable) {
        return  createApiCallObservable();
        }
    });

That way, if the first call has failed the future call will just recall it (only once).

but every other caller who will try to use the first observable will failed and make a different request.

you made a reference to the original observable, let's just update it.

so, a lazy getter:

Observable<Object> apiCall;
private Observable<Object> getCachedApiCall() {
    if ( apiCall == null){
        apiCall = createApiCallObservable().cache(1);
    }
    return apiCall;
}

now, a getter that will retry if the previous was failed:

private Observable<Object> getRetryableCachedApiCall() {
    return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
        public Observable<? extends Object> call(Throwable throwable) {
            apiCall = null;
            return getCachedApiCall();
        }
    });
}

Please note that it will only retry once for each time it is called.

So now your code will look something like this:

---------------------------------------------
// the first time we need it - this will be without a retry if you want..
getCachedApiCall().andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again - for any other call so we will have a retry
getRetryableCachedApiCall().andSomeDifferentStuff()
               .subscribe(subscriberB);
8
votes

This is the solution we ended up with, after extending akarnokd's solution:

public class OnErrorRetryCache<T> {

    public static <T> Observable<T> from(Observable<T> source) {
         return new OnErrorRetryCache<>(source).deferred;
    }

    private final Observable<T> deferred;
    private final Semaphore singlePermit = new Semaphore(1);

    private Observable<T> cache = null;
    private Observable<T> inProgress = null;

    private OnErrorRetryCache(Observable<T> source) {
        deferred = Observable.defer(() -> createWhenObserverSubscribes(source));
    }

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    {
        singlePermit.acquireUninterruptibly();

        Observable<T> cached = cache;
        if (cached != null) {
            singlePermit.release();
            return cached;
        }

        inProgress = source
                .doOnCompleted(this::onSuccess)
                .doOnTerminate(this::onTermination)
                .replay()
                .autoConnect();

        return inProgress;
    }

    private void onSuccess() {
        cache = inProgress;
    }

    private void onTermination() {
        inProgress = null;
        singlePermit.release();
    }
}

We needed to cache the result of an http request from Retrofit. So this was created, with an observable that emits a single item in mind.

If an observer subscribed while the http request was being executed, we wanted it to wait and not execute the request twice, unless the in-progress one failed. To do that the semaphore allows single access to the block that creates or returns the cached observable, and if a new observable is created, we wait until that one terminates. Tests for the above can be found here

5
votes

You have to do some state-handling. Here is how I'd do this:

public class CachedRetry {

    public static final class OnErrorRetryCache<T> {
        final AtomicReference<Observable<T>> cached = 
                new AtomicReference<>();

        final Observable<T> result;

        public OnErrorRetryCache(Observable<T> source) {
            result = Observable.defer(() -> {
                for (;;) {
                    Observable<T> conn = cached.get();
                    if (conn != null) {
                        return conn;
                    }
                    Observable<T> next = source
                            .doOnError(e -> cached.set(null))
                            .replay()
                            .autoConnect();

                    if (cached.compareAndSet(null, next)) {
                        return next;
                    }
                }
            });
        }

        public Observable<T> get() {
            return result;
        }
    }

    public static void main(String[] args) {
        AtomicInteger calls = new AtomicInteger();
        Observable<Integer> source = Observable
                .just(1)
                .doOnSubscribe(() -> 
                    System.out.println("Subscriptions: " + (1 + calls.get())))
                .flatMap(v -> {
                    if (calls.getAndIncrement() == 0) {
                        return Observable.error(new RuntimeException());
                    }
                    return Observable.just(42);
                });

        Observable<Integer> o = new OnErrorRetryCache<>(source).get();

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));
    }
}

It works by caching a fully-successful source and returns it to everyone. Otherwise, a (partially) failed source will crear the cache and the next call observer will trigger a resubscription.

2
votes

Solution by Plato is spot on! In case anyone needed a Kotlin version with extension function and parametrised cache size here it is.

class OnErrorRetryCache<T> constructor(source: Flowable<T>, private val retries: Int? = null) {

val deferred: Flowable<T>
private val singlePermit = Semaphore(1)

private var cache: Flowable<T>? = null
private var inProgress: Flowable<T>? = null

init {
    deferred = Flowable.defer { createWhenObserverSubscribes(source) }
}

private fun createWhenObserverSubscribes(source: Flowable<T>): Flowable<T> {
    singlePermit.acquireUninterruptibly()

    val cached = cache
    if (cached != null) {
        singlePermit.release()
        return cached
    }

    inProgress = source
            .doOnComplete(::onSuccess)
            .doOnTerminate(::onTermination)
            .let {
                when (retries) {
                    null -> it.replay()
                    else -> it.replay(retries)
                }
            }
            .autoConnect()

    return inProgress!!
}

private fun onSuccess() {
    cache = inProgress
}

private fun onTermination() {
    inProgress = null
    singlePermit.release()
}

}

fun <T> Flowable<T>.onErrorRetryCache(retries: Int? = null) = OnErrorRetryCache(this, retries).deferred

And a quick test to prove how it works:

@Test
fun `when source fails for the first time, new observables just resubscribe`() {

    val cacheSize = 2
    val error = Exception()
    var shouldFail = true //only fail on the first subscription

    val observable = Flowable.defer {
        when (shouldFail) {
            true -> Flowable.just(1, 2, 3, 4)
                    .doOnNext { shouldFail = false }
                    .concatWith(Flowable.error(error))
            false -> Flowable.just(5, 6, 7, 8)
        }
    }.onErrorRetryCache(cacheSize)

    val test1 = observable.test()
    val test2 = observable.test()
    val test3 = observable.test()

    test1.assertValues(1, 2, 3, 4).assertError(error) //fails the first time
    test2.assertValues(5, 6, 7, 8).assertNoErrors() //then resubscribes and gets whole stream from source
    test3.assertValues(7, 8).assertNoErrors() //another subscriber joins in and gets the 2 last cached values

}
0
votes

Have you considered using AsyncSubject to implement cache for network request? I made an example application RxApp to test how it could work. I use a singleton model to get the response from network. This makes it possible to cache responses, access the data from multiple Fragments, subscribe to pending request and also to provide mock data for automated UI tests.