2
votes

I am trying to write a resilient Kafka consumer. If there occurs an exception while processing the message in the listener method, I would like to retry it. I would like to retry a few times for certain exceptions, always for certain exceptions and never for the rest. I have read the Spring docs regarding SeekToCurrentErrorHandler, but not 100% sure how to implement it.

I have subclassed the ExceptionClassifierRetryPolicy and and am returning the appropriate Retry Policy based on the exceptions that occur in the Listener method.

I have created RetryTemplate and have and set its RetryPolicy with my custom implementation in the subclass.

I have set the retryTemplate on the Kafka container. I have set the error handler to be new SeekToCurrentHandler and also set the stateful retry property to be true.

Listener Method

@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
   public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
      SomeAvroGeneratedClass obj = message.getPayLoad();
      processIncomingMessage(obj);
      ack.acknowledge();
   }

Custom Retry Policy class

 @Component
 public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
   {
      @PostConstruct
       public void init(){
             final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
             simpleRetryPolicy.setMaxAttempts(8);

     this.setExceptionClassifier( classifiable ->
           {
        // Always Retry when instanceOf   TransientDataAccessException
       if( classifiable.getCause() instanceof TransientDataAccessException)
            {
               return new AlwaysRetryPolicy();                                         

            }
      else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
           {
              return new NeverRetryPolicy();
           }
          else
            {
            return simpleRetryPolicy;
            }

      } );
 }}

Retry Template and Container Config

@Configuration
public class RetryConfig{


 @Bean
 public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){

   RetryTemplate retryTemplate = new RetryTemplate();
   retryTemplate.setRetryPolicy(new MyRetryPolicy());
   FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
   fixedBackOffPolicy.setBackOffPeriod(2000l);
   retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

   factory.setRetryTemplate(retryTemplate);
   factory.setAckOnError(false);
   factory.setErrorHandler(new SeekToCurrentErrorHandler());
   factory.setStateFulRetry(true);
   factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
   );

  }
}

Listener properties:

  1. AckMode = Manual
  2. auto.offset.commit = false

Question:

  1. With my current code, can I achieve my retry logic defined in MyRetryPolicy without causing a consumer rebalance when AlwaysRetryPolicy is returned? If no, please direct me in the right path.

  2. Is my approach right in using the Error handler as well as the Retry?

1

1 Answers

1
votes

Stateful retry in conjunction with a SeekToCurrentErrorHandler is the right approach.

However, if you are using a recent version (2.2 or higher), the error handler will give up after 10 attempts; you can make that infinity with by setting maxFailures to -1 (2.2) or the backOff with Long.MAX_VALUE (2.3 or higher).