1
votes

I'm working on a really simple Spring Boot app which will process messages from a single topic and then persists this batch into a Cassandra database. I trying to use Spring Kafka with a batch of messages and every time when my onMessage() method throws a MyKafkaRetryException then a endless loop of reprocessing is starting but I expect that after the retries are exhausted the batch of messages is moved to the DLT. (If I throw any other exception then a message is immediately moved to DLT.) I just configured a RecoveringBatchErrorHandler with my own exception classification. So I don't know whats wrong here.

    @Bean
    public RecoveringBatchErrorHandler recoveringBatchErrorHandler(
        RetryProperties retryProperties,
        ConsumerRecordRecoverer consumerRecordRecoverer)
    {
        ExponentialBackOff backOff = new ExponentialBackOff(retryProperties.getInitialInterval().toMillis(), retryProperties.getMultiplier());
        backOff.setMaxInterval(retryProperties.getMaxInterval().toMillis());
        backOff.setMaxElapsedTime(retryProperties.calculateMaxElapsedTime().toMillis());
        
        RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(consumerRecordRecoverer, backOff);
        errorHandler.setClassifications(
            ImmutableMap.of(
                MyKafkaRetryException.class, true), 
            false);

        return errorHandler;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> batchKafkaListenerContainerFactory(
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            RecoveringBatchErrorHandler recoveringBatchErrorHandler)
    {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory);
        factory.setBatchListener(true);
        factory.setBatchErrorHandler(recoveringBatchErrorHandler);
        return factory;
    }

I'm using Spring Kafka 2.6.9 and JDK 1.8_261 on OSX

1

1 Answers

0
votes

The RecoveringBatchErrorHandler works in conjunction with the BatchListenerFailedException, which you must throw to indicate which record in the batch failed.

That way, the offsets of the prior records are committed and the remainder of the batch replayed.

Any other exception will cause the entire batch to be redelivered indefinitely because it delegates to a SeekToCurrentBatchErrorHandler which has no recovery mechanism since we don't know which record failed. While the provided BackOff is used, when it is exhausted, the last back of interval is used for the additional retries.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh

Or, you can use a RetryingBatchErrorHandler, which will send the entire batch to the DLT when retries are exhausted.