4
votes

I'm trying to understand when should i use org.springframework.retry.RecoveryCallback and org.springframework.kafka.listener.KafkaListenerErrorHandler?

As of today, I'm using a class (implements org.springframework.retry.RecoveryCallback) to log error message and send the message to DLT and it's working. For sending a message to DLT, I'm using Spring KafkaTemplate and then I came across KafkaListenerErrorHandler and DeadLetterPublishingRecoverer. Now, can you please suggest me, how should i use KafkaListenerErrorHandler and DeadLetterPublishingRecoverer? Can this replace the RecoveryCallback?

Here is my current kafkaListenerContainerFactory code

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(primaryConsumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(recoveryCallback);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setConcurrency(1);  
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;   }
2

2 Answers

4
votes

If it's working as you want now, why change it?

There are several layers and you can choose which one to do the error handling, depending on your needs.

  • KafkaListenerErrorHandler would be invoked for each delivery attempt within the retry, so you typically won't use it with retry.
  • Retry RecoveryCallback is invoked after retries are exhausted (or immmediately if you have classified an exception as not retryable).
  • ErrorHandler - is in the container and is invoked if any listener throws an exception, not just @KafkaListeners.

With recent versions of the framework you can completely replace listener level retry with a SeekToCurrentErrorHandler configured with a DeadLetterPublishingRecoverer and a BackOff.

The DeadLetterPublishingRecoverer is intended for use in a container error handler since it needs the raw ConsumerRecord<?, ?>.

The KafkaListenerErrorHandler only has access to the spring-messaging Message<?> that is converted from the ConsumerRecord<?, ?>.

3
votes

To add on to the excellent context from @GaryRussell, this is what i am currently using:

I am handling any errors(a.k.a exception) like this:

factory.setErrorHandler(new SeekToCurrentErrorHandler(
    new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0L, 0L)));

And to print this error, i have a listener on the .DLT and i am printing the exception stack trace that is stored in the header like so:

@KafkaListener(id = "MY_ID", topics = MY_TOPIC + ".DLT")
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
    @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
            
    logger.error(exceptionStackTrace);
}

Note:
I am using logger.error, because i am redirecting all error messages to an error log file that is being monitored.

BONUS:
If you set the following: logging.level.org.springframework.kafka=DEBUG

You will see this in your console/log:

xxx [org.springframework.kafka.KafkaListenerEndpointContainer#7-2-C-1] DEBUG o.s.k.listener.SeekToCurrentErrorHandler - Skipping seek of: ConsumerRecord xxx
xxx [kafka-producer-network-thread | producer-3] DEBUG o.s.k.l.DeadLetterPublishingRecoverer - Successful dead-letter publication: SendResult xxx

If you have a better way to log, i would appreciate your comment. Thanks!

Cheers