I am trying to write a resilient Kafka consumer. If there occurs an exception while processing the message in the listener method, I would like to retry it. I would like to retry a few times for certain exceptions, always for certain exceptions and never for the rest. I have read the Spring docs regarding SeekToCurrentErrorHandler, but not 100% sure how to implement it.
I have subclassed the ExceptionClassifierRetryPolicy and and am returning the appropriate Retry Policy based on the exceptions that occur in the Listener method.
I have created RetryTemplate and have and set its RetryPolicy with my custom implementation in the subclass.
I have set the retryTemplate on the Kafka container. I have set the error handler to be new SeekToCurrentHandler and also set the stateful retry property to be true.
Listener Method
@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
SomeAvroGeneratedClass obj = message.getPayLoad();
processIncomingMessage(obj);
ack.acknowledge();
}
Custom Retry Policy class
@Component
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(8);
this.setExceptionClassifier( classifiable ->
{
// Always Retry when instanceOf TransientDataAccessException
if( classifiable.getCause() instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy();
}
else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
{
return new NeverRetryPolicy();
}
else
{
return simpleRetryPolicy;
}
} );
}}
Retry Template and Container Config
@Configuration
public class RetryConfig{
@Bean
public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(2000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStateFulRetry(true);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
);
}
}
Listener properties:
- AckMode = Manual
- auto.offset.commit = false
Question:
With my current code, can I achieve my retry logic defined in MyRetryPolicy without causing a consumer rebalance when AlwaysRetryPolicy is returned? If no, please direct me in the right path.
Is my approach right in using the Error handler as well as the Retry?