I am using Spring Cloud Stream's DLQ feature with the Kafka binder. When message processing fails, the message is sent to the DLQ as expected, however, I want to be able to modify the message being sent to the DLQ to include some extra diagnostic information. The problem is the messages sent to the DLQ are the original messages; any mutations I have made are ignored. My approach to solve this so far has been to intercept the message before it is sent to the DLQ and add extra info which is stored in another bean. Specifically, I have tried these two approaches:
- Solution: implement a plain Kafka
ProducerInterceptor
for the DLQ. Problem: the implementation is instantiated outside of the Spring Context and so I cannot inject other beans which I need. Spring Kafka have documented this solution, however, it requires creating a newProducerFactory
bean, which means I can't use the bean from underlying Spring Cloud Stream. - Solution: implement a Spring
ChannelInterceptor
. Problem: I cannot get a reference to the message channel for the DLQ, nor the underlying channel name, so I cannot configure the interceptor for DLQ messages only.
Any ideas on how this could be solved?