0
votes

I am using Spring Cloud Stream's DLQ feature with the Kafka binder. When message processing fails, the message is sent to the DLQ as expected, however, I want to be able to modify the message being sent to the DLQ to include some extra diagnostic information. The problem is the messages sent to the DLQ are the original messages; any mutations I have made are ignored. My approach to solve this so far has been to intercept the message before it is sent to the DLQ and add extra info which is stored in another bean. Specifically, I have tried these two approaches:

  1. Solution: implement a plain Kafka ProducerInterceptor for the DLQ. Problem: the implementation is instantiated outside of the Spring Context and so I cannot inject other beans which I need. Spring Kafka have documented this solution, however, it requires creating a new ProducerFactory bean, which means I can't use the bean from underlying Spring Cloud Stream.
  2. Solution: implement a Spring ChannelInterceptor. Problem: I cannot get a reference to the message channel for the DLQ, nor the underlying channel name, so I cannot configure the interceptor for DLQ messages only.

Any ideas on how this could be solved?

1

1 Answers

1
votes

Problem: I cannot get a reference to the message channel for the DLQ, nor the underlying channel name, so I cannot configure the interceptor for DLQ messages only.

The error channel name is

destinationName.group.errors

You can get it as an AbstractMessageChannel from the application context...

context.getBean("destinationName.group.errors", AbstractMessageChannel.class)

...and add the interceptor.

Alternatively, don't use the binder's DLQ mechanism at all and add your own error handler:

@ServiceActivator(inputChannel = "destinationName.group.errors")
void errors(Message<?> error) {
    ...
}