We are trying to use the DLT feature in Spring Kafka 2.6.x. This is the config yml:
kafka:
bootstrap-servers: localhost:9092
auto-offset-reset: earliest
consumer:
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
enable-auto-commit: false
properties:
isolation.level: read_committed
fetch.max.wait.ms: 100
spring.json.value.default.type: 'com.sample.entity.Event'
spring.json.trusted.packages: 'com.sample.entity.*'
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer
And here is the KafkaConfig class:
@EnableKafka
@Configuration
@Log4j2
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event>
kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Event> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
handler.addNotRetryableExceptions(UnprocessableException.class);
return handler;
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations kafkaOperations) {
return new DeadLetterPublishingRecoverer(kafkaOperations);
}
}
It is fine without the ConcurrentKafkaListenerContainerFactory
, but since we want to scale up scale down the number of instances, we want to use the ConcurrentKafkaListenerContainer
.
What is the proper way to do this?
Also, I found that if it is Deserialisation Exception, the message in .DLT is not sent properly (not proper JSON), while if it is "UnprocessableException" (our custom exception that throws within listener) it is proper JSON in .DLT