0
votes

I have the following snippet:

Observable.just(1)
    .flatMap(-> doSomething1)
    .timeout(10, SECONDS)
    .flatMap(-> doSomething2)
    .timeout(10, SECONDS)
    .flatMap(-> doSomething3)
    .timeout(10, SECONDS)
    .flatMap(-> doSomething4)
    .timeout(10, SECONDS)
    .subscribe();

I don't want to repeat myself after every flatMap adding a timeout. My first thought was to only apply timeout to the beginning or end of the stream, but that is not the behaviour I intent as it only applies timeout to the closer observable.

Observable.just(1)
    .flatMap(-> doSomething1)
    .flatMap(-> doSomething2)
    .flatMap(-> doSomething3)
    .flatMap(-> doSomething4)
    .timeout(10, SECONDS)
    .subscribe();

Observable.just(1)
    .timeout(10, SECONDS)
    .flatMap(-> doSomething1)
    .flatMap(-> doSomething2)
    .flatMap(-> doSomething3)
    .flatMap(-> doSomething4)
    .subscribe();

The doSomethingX functions execute a little code on call that can take a while before returning the next observable which itself does not need to be wrapped into a timeout.

How can this be improved?

UPDATE:

O more practical example below. The ideia is to compose a stream that I can retry in case of failure or timeout. I'm simulating the scenario where one of the operators times out once, but works on a retry.

@Test
public void streamToBeSimplified() throws Exception {
    final AtomicBoolean retry = new AtomicBoolean(true);

    Action1<Object> print = new Action1<Object>() {
        @Override
        public void call(Object o) {
            System.out.println(" >>>" + o);
        }
    };

    Observable.just(1)
            .doOnNext(print)
            .flatMap(new Func1<Integer, Observable<Integer>>() {
                @Override
                public Observable<Integer> call(Integer integer) {
                    return Observable.just(2);
                }
            })
            .timeout(1, TimeUnit.SECONDS)
            .doOnNext(print)
            .flatMap(new Func1<Object, Observable<Integer>>() {
                @Override
                public Observable<Integer> call(Object o) {

                    if(retry.getAndSet(false)) {
                        try {
                            Thread.sleep(2000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    return Observable.just(3);
                }
            })
            .timeout(1, TimeUnit.SECONDS)
            .doOnNext(print)
            .retry(2)
            .subscribe();

}
1
Is doSomethingX a Observable or a Method-Call as in doSomethingX()? - Hans Wurst
I've added a better example - Filipe Esperandio

1 Answers

2
votes

You could create a helper method like this:

private Observable doThings() {
    return Observable.just(1)
        .flatMap(__ -> withTimeout(doSomething1, 10, TimeUnit.SECONDS))
        .flatMap(__ -> withTimeout(doSomething2, 10, TimeUnit.SECONDS));
        // etc
}

private static <T> Observable<T> withTimeout(Observable<T> observable, long time, TimeUnit timeUnit) {
    return observable
            .timeout(time, timeUnit);
}