1
votes

We're currently using the Spring Cloud Stream to provide a SubscribableChannel for input and a MessageChannel for output, and we use Spring Integration to provide business logic between the two Kafka topics. In other Spring Integration applications, we have used the errorChannel header to dynamically provide the error handling channel (for example, changing the error channel when a certain change has occurred that will need to be rolled back on an error).

However, the input channel provided by Spring Cloud Stream does not appear to honor the errorChannel header. Is there a way to enable this?

Example code (please excuse typos--hand copied from another machine)

public interface TestStreams {
     @Input
    SubscribableChannel input();

    @Output
    MessageChannel output();
}
@SpringBootApplication
@EnableBinding(TestSTreams.class)
public class TestService {
    public static void main(String[] args) {
        SpringApplication.run(TestProcessorMicroservice.class, args);
    }

    @Bean
    public IntegrationFlow buildProcessingFlow(TestStreams streams, TransformerThatThrowsException transformer) {
        return IntegrationFlows.from(streams.input())
                .enrichHeaders(ImmutableMap.of(MessageHeaders.ERROR_CHANNEL, "errorChannel")
                .transform(transformer)
                .channel(streams.output())
                .get();
    }

    @Bean
    public IntegrationFlow buildErrorFlow() {
        return IntegrationFlows.from("errorChannel")
            .handle(new MyErrorHandler())
            .get();
    }
}

The code above runs properly, but when the transformer throws an exception, the error is handled by org.springframework.kafka.listener.LoggingErrorHandler instead of being routed to errorChannel and ultimately being handled by MyErrorHandler.

1

1 Answers

0
votes

It is not going to work that way because an exception is thrown to the KafkaListenerContainer with the mentioned LoggingErrorHandler by default. The errorChannel just doesn't have any value in that context. Because there is no Spring Integration initiator of the flow.

If you want to handle .transform(transformer) errors, you can consider to use an ExpressionEvaluatingRequestHandlerAdvice with its failureChannel functionality.

See its JavaDocs and Reference Manual for more info.

To add such an Advice into the flow, you need to add one more lambda argument for that transform(). For example:

.transform(transformer, e -> e.advice(myExpressionEvaluatingRequestHandlerAdvice())