I'm working on a really simple Spring Boot app which will process messages from a single topic and then persists this batch into a Cassandra database. I trying to use Spring Kafka with a batch of messages and every time when my onMessage() method throws a MyKafkaRetryException then a endless loop of reprocessing is starting but I expect that after the retries are exhausted the batch of messages is moved to the DLT. (If I throw any other exception then a message is immediately moved to DLT.) I just configured a RecoveringBatchErrorHandler with my own exception classification. So I don't know whats wrong here.
@Bean
public RecoveringBatchErrorHandler recoveringBatchErrorHandler(
RetryProperties retryProperties,
ConsumerRecordRecoverer consumerRecordRecoverer)
{
ExponentialBackOff backOff = new ExponentialBackOff(retryProperties.getInitialInterval().toMillis(), retryProperties.getMultiplier());
backOff.setMaxInterval(retryProperties.getMaxInterval().toMillis());
backOff.setMaxElapsedTime(retryProperties.calculateMaxElapsedTime().toMillis());
RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(consumerRecordRecoverer, backOff);
errorHandler.setClassifications(
ImmutableMap.of(
MyKafkaRetryException.class, true),
false);
return errorHandler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> batchKafkaListenerContainerFactory(
ConsumerFactory<Object, Object> kafkaConsumerFactory,
RecoveringBatchErrorHandler recoveringBatchErrorHandler)
{
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setBatchListener(true);
factory.setBatchErrorHandler(recoveringBatchErrorHandler);
return factory;
}
I'm using Spring Kafka 2.6.9 and JDK 1.8_261 on OSX