I have a Springboot app configured with spring-kafka where I want to handle all sorts of error that can happen while listening to a topic. If any message is missed / not able to be consumed because of either Deserialization or any other Exception, there will be 2 retries and after which the message should be logged to an error file. I have two approaches that can be followed :-
First Approach( Using SeekToCurrentErrorHandler with DeadLetterPublishingRecoverer):-
@Autowired
KafkaTemplate<String,Object> template;
@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".DLT", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
But for this we require addition topic(a new .DLT topic) and then we can log it to a file.
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
logger.error(exceptionStackTrace);
}
Approach 2 ( Using custom SeekToCurrentErrorHandler) :-
@Bean
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private static final int MAX_RETRY_ATTEMPTS = 2;
CustomSeekToCurrentErrorHandler() {
super(MAX_RETRY_ATTEMPTS);
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
try {
if (!records.isEmpty()) {
log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
super.handle(exception, records, consumer, container);
}
} catch (SerializationException e) {
log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
}
}
}
Can anyone provide their suggestions on what's the standard way to implement this kind of feature. In first approach we do see an overhead of creation of .DLT topics and an additional @KafkaListener. In second approach, we can directly log our consumer record exception.