We're currently using the Spring Cloud Stream to provide a SubscribableChannel
for input and a MessageChannel
for output, and we use Spring Integration to provide business logic between the two Kafka topics. In other Spring Integration applications, we have used the errorChannel
header to dynamically provide the error handling channel (for example, changing the error channel when a certain change has occurred that will need to be rolled back on an error).
However, the input channel provided by Spring Cloud Stream does not appear to honor the errorChannel
header. Is there a way to enable this?
Example code (please excuse typos--hand copied from another machine)
public interface TestStreams {
@Input
SubscribableChannel input();
@Output
MessageChannel output();
}
@SpringBootApplication
@EnableBinding(TestSTreams.class)
public class TestService {
public static void main(String[] args) {
SpringApplication.run(TestProcessorMicroservice.class, args);
}
@Bean
public IntegrationFlow buildProcessingFlow(TestStreams streams, TransformerThatThrowsException transformer) {
return IntegrationFlows.from(streams.input())
.enrichHeaders(ImmutableMap.of(MessageHeaders.ERROR_CHANNEL, "errorChannel")
.transform(transformer)
.channel(streams.output())
.get();
}
@Bean
public IntegrationFlow buildErrorFlow() {
return IntegrationFlows.from("errorChannel")
.handle(new MyErrorHandler())
.get();
}
}
The code above runs properly, but when the transformer throws an exception, the error is handled by org.springframework.kafka.listener.LoggingErrorHandler
instead of being routed to errorChannel
and ultimately being handled by MyErrorHandler
.