
We are trying to use the DLT feature in Spring Kafka 2.6.x. This is the config yml:

    bootstrap-servers: localhost:9092
    auto-offset-reset: earliest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      enable-auto-commit: false
        isolation.level: read_committed
        fetch.max.wait.ms: 100
        spring.json.value.default.type: 'com.sample.entity.Event'
        spring.json.trusted.packages: 'com.sample.entity.*'
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer

And here is the KafkaConfig class:

public class KafkaConfig {

    public ConcurrentKafkaListenerContainerFactory<String, Event>
    kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                                  ConsumerFactory<Object, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, Event> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
        return handler;

    public DeadLetterPublishingRecoverer publisher(KafkaOperations kafkaOperations) {
        return new DeadLetterPublishingRecoverer(kafkaOperations);

It is fine without the ConcurrentKafkaListenerContainerFactory, but since we want to scale up scale down the number of instances, we want to use the ConcurrentKafkaListenerContainer.

What is the proper way to do this?

Also, I found that if it is Deserialisation Exception, the message in .DLT is not sent properly (not proper JSON), while if it is "UnprocessableException" (our custom exception that throws within listener) it is proper JSON in .DLT


1 Answers


Since you are wiring up your own consumer factory, you have to set the error handler on it.

but since we want to scale up scale down the number of instances, we want to use the ConcurrentKafkaListenerContainer.

Boot's auto configuration wires up a concurrent container with concurrency=1 (if there is no ....listener.concurrency property); so you can use Boot's factory.

For deserialization exceptions (all exceptions), the record.value() is whatever came in originally. If that's not what you are seeing, please provide an example of what's in the original record and the DLT.