When multiple onErrorContinue added to the pipeline to handle specific type of exception thrown from flatMap, the exception handling is not working as expected.
The below code, I expect, the elements 1 to 6 should be dropped and element 7 to 10 should be consumed by the subscriber.
public class FlatMapOnErrorContinueExample {
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.flatMap(number -> {
if (number <= 3) {
return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
} else if (number > 3 && number <= 6) {
return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
} else {
return Mono.just(number);
}
})
.onErrorContinue(NumberLesserThanThree.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))
.onErrorContinue(NumberLesserThanSixButGretherThan3.class,
(throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))
.onErrorContinue((throwable, object) ->
System.err.println("Exception: " + throwable.getMessage()))
.subscribe(number -> System.out.println("number is " + number),
error -> System.err.println("Exception in Subscription " + error.getMessage()));
}
public static class NumberLesserThanThree extends RuntimeException {
public NumberLesserThanThree(final String msg) {
super(msg);
}
}
public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
public NumberLesserThanSixButGretherThan3(final String msg) {
super(msg);
}
}
}
Here is the output what I am getting:
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6
Question: Why the 2nd onErrorContinue is not called but the exception send to subscriber?
Additional Note:
if i remove 1st and 2nd onErrorContinue, then all exception are handled by 3rd onErrorContinue. I could use this approach to receive all exception and check for the type of exception and proceed with handling. However, I would like to make it cleaner exception handling rather than adding if..else block.
How this question is different from Why does Thread.sleep() trigger the subscription to Flux.interval()?
1) This question about exception handling and the order of exception handling; The other question is about processing elements in parallel and making the main thread waiting for the all the element processing complete
3) This question dont have any concern about threading, even if add Thread.sleep(10000) after . subscribe, there is no change in behaviour.