I am currently using Spring Kafka to consume messages from topic along with @Retry of Spring. So basically, I am retrying to process the consumer message in case of an error. But while doing so, I want to avoid the exception message thrown by KafkaMessageListenerContainer
. Instead I want to display a custom message. I tried adding an error handler in the ConcurrentKafkaListenerContainerFactory
but on doing so, my retry does not get invoked.
Can someone guide me on how to display a custom exception message along with @Retry
scenario as well? Below are my code snippets:
ConcurrentKafkaListenerContainerFactory Bean Config
@Bean
ConcurrentKafkaListenerContainerFactory << ? , ? > concurrentKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory < Object, Object > kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory < Object, Object > kafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory < > ();
configurer.configure(kafkaListenerContainerFactory, kafkaConsumerFactory);
kafkaListenerContainerFactory.setConcurrency(1);
// Set Error Handler
/*kafkaListenerContainerFactory.setErrorHandler(((thrownException, data) -> {
log.info("Retries exhausted);
}));*/
return kafkaListenerContainerFactory;
}
Kafka Consumer
@KafkaListener(
topics = "${spring.kafka.reprocess-topic}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "concurrentKafkaListenerContainerFactory"
)
@Retryable(
include = RestClientException.class,
maxAttemptsExpression = "${spring.kafka.consumer.max-attempts}",
backoff = @Backoff(delayExpression = "${spring.kafka.consumer.backoff-delay}")
)
public void onMessage(ConsumerRecord < String, String > consumerRecord) throws Exception {
// Consume the record
log.info("Consumed Record from topic : {} ", consumerRecord.topic());
// process the record
messageHandler.handleMessage(consumerRecord.value());
}