1
votes

I try to handle the exceptions that happen in my spring integration flow.

The flow is as follows:

source -> split -> handle -> handle

My source gives a List of objects as payload. The split emits one element at a time.

The 1st handler encounters exception. I was expecting that since the exception is being published to the error channel configured in below samples, the other elements in the list will continue to be emitted. But after the first exception the flow stops!!! Is there a configuration that I am missing ?

    @Bean
    public IntegrationFlow pubSubFlow(PublishSubscribeChannel publishSubscribeChannel,
            @Qualifier("myMessagePublishingErrorHandler") MessagePublishingErrorHandler messagePublishingErrorHandler) {

        return flow -> flow
                .channel(publishSubscribeChannel)
                .publishSubscribeChannel(config -> config
                        .subscribe(f1 -> f1
                                .split()
                                .handle("action", "act")
                                .handle(m1 -> System.out.println(">>>" + m1)))
                        .subscribe(f1 -> f1
                                .split()
                                .handle(m1 -> System.out.println("<<<" + m1)))
                        .errorHandler(messagePublishingErrorHandler));
    }

The error handler:

@Bean
    public MessagePublishingErrorHandler myMessagePublishingErrorHandler(@Qualifier("appErrorChannel") DirectChannel directChannel) {
        MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler();
        messagePublishingErrorHandler.setDefaultErrorChannel(directChannel);
        return messagePublishingErrorHandler;
    }

    @Bean
    public DirectChannel appErrorChannel() {
        return new DirectChannel();
    }
    @Bean
    public IntegrationFlow errorFlow(@Qualifier("appErrorChannel") DirectChannel directChannel) {
        return IntegrationFlows.from(directChannel).handle(System.out::println).get();
    }
1

1 Answers

2
votes

I refactored your pubSubFlow code a bit to fix errors and make it much readable. Please, consider in the future to respect our effort to help you making the question as clean as possible.

So, your code doesn't reflect a description. You have .split() in each pub-sub branch, not in the main flow as you may would like.

You have a messagePublishingErrorHandler on the publishSubscribeChannel, so, any downstream errors are indeed handled there, but that is done for the message sent to that sub-flow. Since splitter is a part of that sub-flow (for one incoming message) it definitely stops working because it has just bubbled an error.

Please, reconsider what you want from the flow. And if you need to split only once and then pub-sub, then move .split() before publishSubscribeChannel().

However pay attention, please, that messagePublishingErrorHandler is going to work on the PublishSubscribeChannel if it is configured with an Executor.

Anyway there is always the way to place an ExecutorChannel just after splitter to handle item in parallel and don't impact the main splitting loop with errors.