0
votes

I'm working on a project that involves Hystrix, and I decided to use RxJava. Now, forget Hystrix for the rest of this because I believe the main problem is with my complete screwing up of writing the Observable code correctly.

Need: I need a way to return an observable that represents a number of observables, each running a user task. I want that Observable to be able to return all results from the tasks, even errors.

Problem: Observable streams die on errors. If I have three tasks and the second task throws an exception, I never receive the third task even if it would have succeeded.

My Code:

public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
    return Observable
            .from(tasks)
            .flatMap(task -> {
                try {
                    return new MyCommand(task.getTaskId(),groupName,task).toObservable().subscribeOn(this.schedulerFactory.get(groupName));
                } catch(Exception ex) {
                    return Observable.error(ex);
                }
            });
}

Given that MyCommand is a class that extends HystrixObservableCommand, it returns an Observable and so shouldn't figure in on the problems I'm seeing.

Attempt 1:

Used Observable.flatMap as above

  • Good: Each Command is scheduled on it's own thread and the tasks run asynchronously.
  • Bad: On first Command exception, Observable completes having emitted previous successful results and emitting the Exception. Any in-flight Commands are ignored.

Attempt 2:

Used Observable.concatMapDelayError instead of flatMap

  • Bad: For some reason, tasks run synchronously. Why??
  • Good: I get all the successful results.
  • ~Good: OnError gets a Composite exception with a list of the exceptions thrown.

Any help will be greatly appreciated and probably result in me being very embarrassed for not having thought of it myself.

Additional Code

This test succeeds with Observable.flatMap, but fails when using Observable.concatMapDelayError because the tasks do not run asynchronously:

java.lang.AssertionError: Execution time ran over the 350ms limit: 608

@Test
public void shouldRunManagedAsyncTasksConcurrently() throws Exception {
    Observable<String> testObserver = executor.observeManagedAsync("asyncThreadPool",getTimedTasks()); 
    TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    long startTime = System.currentTimeMillis();
    testObserver.doOnError(throwable -> {
        System.out.println("error: " + throwable.getMessage());
    }).subscribe(testSubscriber);
    System.out.println("Test execution time: "+(System.currentTimeMillis()-startTime));
    testSubscriber.awaitTerminalEvent();
    long execTime = (System.currentTimeMillis()-startTime);
    System.out.println("Test execution time: "+execTime);
    testSubscriber.assertCompleted();
    System.out.println("Errors: "+testSubscriber.getOnErrorEvents());
    System.out.println("Results: "+testSubscriber.getOnNextEvents());
    testSubscriber.assertNoErrors();
    assertTrue("Execution time ran under the 300ms limit: "+execTime,execTime>=300);
    assertTrue("Execution time ran over the 350ms limit: "+execTime,execTime<=350);
    testSubscriber.assertValueCount(3);
    assertThat(testSubscriber.getOnNextEvents(),containsInAnyOrder("hello","wait","world"));
    verify(this.mockSchedulerFactory, times(3)).get("asyncThreadPool");
}

Tasks for the above unit test:

protected List<EspTask<String>> getTimedTasks() {
    EspTask longTask = new EspTask("helloTask") {
        @Override
        public Object doCall() throws Exception {
            Thread.currentThread().sleep(100);
            return "hello";
        }
    };
    EspTask longerTask = new EspTask("waitTask") {
        @Override
        public Object doCall() throws Exception {
            Thread.currentThread().sleep(150);
            return "wait";
        }

    };
    EspTask longestTask = new EspTask("worldTask") {
        @Override
        public Object doCall() throws Exception {
            Thread.currentThread().sleep(300);
            return "world";
        }
    };
    return Arrays.asList(longTask, longerTask, longestTask);
}
3

3 Answers

1
votes

You can use Observable.onErrorReturn(), and return special value (e.g. null), then filter non-special values downstream. Keep in mind that source observable will complete on error. Also depending on use case Observable.onErrorResumeNext()methods can be useful aswell. If you are interested in error notifications, use Observable.materialize(), this will convert items and onError(), onComplete() into Notifications, which then can be filtered by Notification.getKind()

Edit. All operators mentioned above should be added right after .toObservable().subscribeOn(this.schedulerFactory.get(groupName)); assuming try/catch was absent.

1
votes

You want to use mergeDelayError:

public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
    return Observable.mergeDelayError(Observable
        .from(tasks)
        .map(task -> {
            try {
                return new MyCommand(task.getTaskId(),groupName,task).toObservable().subscribeOn(this.schedulerFactory.get(groupName));
            } catch(Exception ex) {
                return Observable.error(ex);
            }
        }));
}

Note that your MyCommand constructor should not throw any exceptions; this allows your code to be written more concisely:

public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
    return from(tasks)
           .map(task -> new MyCommand(task.getTaskId(), groupName, task)
                        .toObservable()
                        .subscribeOn(this.schedulerFactory.get(groupName)))
           .compose(Observable::mergeDelayError);

}

Keep in mind that this will still invoke onError at most once; if you need explicit handling of all errors, use something like an Either<CommandResult, Throwable> as the return type (or handle the errors and return an empty observable).

1
votes

Use .materialize() to allow all emissions and errors to come through as wrapped notifications then deal with them as you wish:

     .flatMap(task -> {
            try {
                return new MyCommand(task.getTaskId(),groupName,task)
                    .toObservable()
                    .subscribeOn(this.schedulerFactory.get(groupName))
                    .materialize();
            } catch(Exception ex) {
                return Observable.error(ex).materialize();
            }
        });