I'm working on a project that involves Hystrix, and I decided to use RxJava. Now, forget Hystrix for the rest of this because I believe the main problem is with my complete screwing up of writing the Observable code correctly.
Need: I need a way to return an observable that represents a number of observables, each running a user task. I want that Observable to be able to return all results from the tasks, even errors.
Problem: Observable streams die on errors. If I have three tasks and the second task throws an exception, I never receive the third task even if it would have succeeded.
My Code:
public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
return Observable
.from(tasks)
.flatMap(task -> {
try {
return new MyCommand(task.getTaskId(),groupName,task).toObservable().subscribeOn(this.schedulerFactory.get(groupName));
} catch(Exception ex) {
return Observable.error(ex);
}
});
}
Given that MyCommand is a class that extends HystrixObservableCommand, it returns an Observable and so shouldn't figure in on the problems I'm seeing.
Attempt 1:
Used Observable.flatMap
as above
- Good: Each Command is scheduled on it's own thread and the tasks run asynchronously.
- Bad: On first Command exception, Observable completes having emitted previous successful results and emitting the Exception. Any in-flight Commands are ignored.
Attempt 2:
Used Observable.concatMapDelayError
instead of flatMap
- Bad: For some reason, tasks run synchronously. Why??
- Good: I get all the successful results.
- ~Good: OnError gets a Composite exception with a list of the exceptions thrown.
Any help will be greatly appreciated and probably result in me being very embarrassed for not having thought of it myself.
Additional Code
This test succeeds with Observable.flatMap
, but fails when using Observable.concatMapDelayError
because the tasks do not run asynchronously:
java.lang.AssertionError: Execution time ran over the 350ms limit: 608
@Test
public void shouldRunManagedAsyncTasksConcurrently() throws Exception {
Observable<String> testObserver = executor.observeManagedAsync("asyncThreadPool",getTimedTasks());
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
long startTime = System.currentTimeMillis();
testObserver.doOnError(throwable -> {
System.out.println("error: " + throwable.getMessage());
}).subscribe(testSubscriber);
System.out.println("Test execution time: "+(System.currentTimeMillis()-startTime));
testSubscriber.awaitTerminalEvent();
long execTime = (System.currentTimeMillis()-startTime);
System.out.println("Test execution time: "+execTime);
testSubscriber.assertCompleted();
System.out.println("Errors: "+testSubscriber.getOnErrorEvents());
System.out.println("Results: "+testSubscriber.getOnNextEvents());
testSubscriber.assertNoErrors();
assertTrue("Execution time ran under the 300ms limit: "+execTime,execTime>=300);
assertTrue("Execution time ran over the 350ms limit: "+execTime,execTime<=350);
testSubscriber.assertValueCount(3);
assertThat(testSubscriber.getOnNextEvents(),containsInAnyOrder("hello","wait","world"));
verify(this.mockSchedulerFactory, times(3)).get("asyncThreadPool");
}
Tasks for the above unit test:
protected List<EspTask<String>> getTimedTasks() {
EspTask longTask = new EspTask("helloTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(100);
return "hello";
}
};
EspTask longerTask = new EspTask("waitTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(150);
return "wait";
}
};
EspTask longestTask = new EspTask("worldTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(300);
return "world";
}
};
return Arrays.asList(longTask, longerTask, longestTask);
}