0
votes

We are trying to use the DLT feature in Spring Kafka 2.6.x. This is the config yml:

  kafka:
    bootstrap-servers: localhost:9092
    auto-offset-reset: earliest
    consumer:
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      enable-auto-commit: false
      properties:
        isolation.level: read_committed
        fetch.max.wait.ms: 100
        spring.json.value.default.type: 'com.sample.entity.Event'
        spring.json.trusted.packages: 'com.sample.entity.*'
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonDeserializer

And here is the KafkaConfig class:

@EnableKafka
@Configuration
@Log4j2
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Event>
    kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                                  ConsumerFactory<Object, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, Event> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
        handler.addNotRetryableExceptions(UnprocessableException.class);
        return handler;
    }

    @Bean
    public DeadLetterPublishingRecoverer publisher(KafkaOperations kafkaOperations) {
        return new DeadLetterPublishingRecoverer(kafkaOperations);
    }
}

It is fine without the ConcurrentKafkaListenerContainerFactory, but since we want to scale up scale down the number of instances, we want to use the ConcurrentKafkaListenerContainer.

What is the proper way to do this?

Also, I found that if it is Deserialisation Exception, the message in .DLT is not sent properly (not proper JSON), while if it is "UnprocessableException" (our custom exception that throws within listener) it is proper JSON in .DLT

1

1 Answers

0
votes

Since you are wiring up your own consumer factory, you have to set the error handler on it.

but since we want to scale up scale down the number of instances, we want to use the ConcurrentKafkaListenerContainer.

Boot's auto configuration wires up a concurrent container with concurrency=1 (if there is no ....listener.concurrency property); so you can use Boot's factory.

For deserialization exceptions (all exceptions), the record.value() is whatever came in originally. If that's not what you are seeing, please provide an example of what's in the original record and the DLT.