I have the following snippet:
Observable.just(1)
.flatMap(-> doSomething1)
.timeout(10, SECONDS)
.flatMap(-> doSomething2)
.timeout(10, SECONDS)
.flatMap(-> doSomething3)
.timeout(10, SECONDS)
.flatMap(-> doSomething4)
.timeout(10, SECONDS)
.subscribe();
I don't want to repeat myself after every flatMap adding a timeout. My first thought was to only apply timeout to the beginning or end of the stream, but that is not the behaviour I intent as it only applies timeout to the closer observable.
Observable.just(1)
.flatMap(-> doSomething1)
.flatMap(-> doSomething2)
.flatMap(-> doSomething3)
.flatMap(-> doSomething4)
.timeout(10, SECONDS)
.subscribe();
Observable.just(1)
.timeout(10, SECONDS)
.flatMap(-> doSomething1)
.flatMap(-> doSomething2)
.flatMap(-> doSomething3)
.flatMap(-> doSomething4)
.subscribe();
The doSomethingX functions execute a little code on call that can take a while before returning the next observable which itself does not need to be wrapped into a timeout.
How can this be improved?
UPDATE:
O more practical example below. The ideia is to compose a stream that I can retry in case of failure or timeout. I'm simulating the scenario where one of the operators times out once, but works on a retry.
@Test
public void streamToBeSimplified() throws Exception {
final AtomicBoolean retry = new AtomicBoolean(true);
Action1<Object> print = new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(" >>>" + o);
}
};
Observable.just(1)
.doOnNext(print)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(2);
}
})
.timeout(1, TimeUnit.SECONDS)
.doOnNext(print)
.flatMap(new Func1<Object, Observable<Integer>>() {
@Override
public Observable<Integer> call(Object o) {
if(retry.getAndSet(false)) {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return Observable.just(3);
}
})
.timeout(1, TimeUnit.SECONDS)
.doOnNext(print)
.retry(2)
.subscribe();
}