I am trying to write kafka consumer using spring-kafka version 2.3.0.M2 library. To handle run time errors I am using SeekToCurrentErrorHandler.class with DeadLetterPublishingRecoverer as my recoverer. This works fine only when my consumer code throws exception, but fails when unable to deserialize the message.
I tried implementing ErrorHandler myself and I was successful but with this approach I myself end up writing DLT code to handle error messages which I do not want to do.
Below are my kafka properties
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.json.trusted.packages: com.mypackage
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}