0
votes

My usecase is to receive Kafka messages, make multiple attempts at a rest call and upon exhaustion dump the failed mesage onto Kafka DLQ topic.

    @StreamListener(EventSource.SOME_CHANNEL)
    public void processMessage(Message<?> unsolicitedMessage) {
         String aString = .....   
         oneService.act(aString);      
    }

@Retryable is working perfectly in terms of handing logic for multiple attempts.

    @Retryable(value = {OneException.class, TwoException}, maxAttempts = 3, 
    backoff = @Backoff(delay = 1000))
    public boolean act(String message, String endPoint) {
         //do stuff
    }

For the Spring Cloud Stream inbuilt Kafka DLQ publishing to kick in (enableDlq: true) , the exception needs to bubble-up to the @StreamListener annotated method for the Kafka binder to do the needful.

However, in doing so, I am not able to leverage the @Recover annotated method, where the flow perfectly lands after retrying:

@Recover
public boolean recoverOnToDLQ(OneException ex, String message, String 
endPoint) {
    throw ex; //Required for StreamListener Kafka DLQ to kick in! 
}

Question: Is there a way to trigger Kakfa DLQ publishing from within the @Recover method without re-throwing the exception?

Because if I use it only to rethrow, I believe I won't be making efficient use of the tighter control obtained therein. This will also simplify unit test cases, and better capture the logic at code-level? Are there any thoughts on how to handle this better?

I am on all the latest versions for spring-cloud, spring-cloud-stream and spring-retry as of this date.

1

1 Answers

2
votes

It can be done, but the question is "why would you want to?".

The binder has retry built in; simply throw the exception back to the binder and it will send the data to the DLQ after retries are exhausted. Retry is configured using binder consumer retry properties. You would not need one line of additional code.

By using @Retryable, you are nesting 2 RetryTemplates (unless you are disabling the binder retry by setting the consumer maxAttempts property to 1).

Of course, you can configure your own DLQ destination and simply write whatever you want to that in your recoverer. But, to use the binder's built-in DLQ publisher you'd have to craft a special ErrorMessage (with the properties the publisher needs) and send it to the binding's error channel. The publisher needs the raw kafka ConsumerRecord which you would have to re-create since it's not available to the listener.

All-in-all, for your use case, it seems much simpler to just use the binder's retry config.

EDIT

In 2.0.x you can add a RetryTemplate @Bean to your application and it will be used for all consumer bindings (overriding the binding properties).

This template can be customized with any retry policy, retryable exceptions, etc, etc.

In 2.0.2 it will have to be qualified with @StreamRetryTemplate; this was a fix because any RetryTemplate overrides the properties, which may not be the desired behavior.