I am using spring-kafka 2.2.6. I have used SeekToCurrentErrorHandler and ErrorHandlingDeserializer2. SeekToCurrentErrorHandler currently configured to log message after three retries. Is there any way to skip retries for validation errors (caught by Validator implementation in Spring) and message conversion errors? All the errors are being intercepted by container error handler i.e SeeToCurrentErrorHandler. Should I override the handle method of SeeToCurrentErrorHandler?
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
factory.setAutoStartup(false);
factory.setErrorHandler(new SeekToCurrentErrorHandler((c, e) -> {
LOG.info(e.getMessage());
}, this.kafkaConfigProperties.getRetryCount()));
return factory;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> map = new HashMap<>();
Properties consumerProperties = getConsumerProperties();
consumerProperties.forEach((key, value) -> {
map.put((String) key, value);
});
KafkaSoapMessageConverter kafkaSoapMessageConverter = new KafkaSoapMessageConverter();
Map<String, Object> configMap = new HashMap<>(1);
configMap.put(KafkaSoapMessageConverter.CLASS_TO_DESERIALIZE, MyClass.class);
kafkaSoapMessageConverter.configure(configMap, false);
ErrorHandlingDeserializer2<Object> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
kafkaSoapMessageConverter);
DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(map);
consumerFactory.setValueDeserializer(errorHandlingDeserializer);
return consumerFactory;
}
EDIT
I have used the below code
if(DeserializationException.class == e.getClass()
|| e.getCause().getClass() == MethodArgumentNotValidException.class) {
SeekUtils.doSeeks(records, consumer, e, true, (c, e) -> { return true; }, LOG);
} else {
super.handle(e, records, consumer, container);
}