3
votes

I am trying to write kafka consumer using spring-kafka version 2.3.0.M2 library. To handle run time errors I am using SeekToCurrentErrorHandler.class with DeadLetterPublishingRecoverer as my recoverer. This works fine only when my consumer code throws exception, but fails when unable to deserialize the message.

I tried implementing ErrorHandler myself and I was successful but with this approach I myself end up writing DLT code to handle error messages which I do not want to do.

Below are my kafka properties

spring:
   kafka:
     consumer:
        bootstrap-servers: localhost:9092
        group-id: group_id
        auto-offset-reset: latest
        key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        properties:
          spring.json.trusted.packages: com.mypackage
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        KafkaTemplate<Object, Object> template) {
      ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
      configurer.configure(factory, kafkaConsumerFactory);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}
1
Please provide more information than "it fails". Fails how?Gary Russell
I mean it throws deserialization exception when I send message in wrong format and program stops abruptly. Exception is as expected but I wanted it to captured by ErrorHandler and put message as is in DLT topicDeepak Naik
There is nothing in the framework that will "stop the program abruptly". I'll try to find some time to test this.Gary Russell
Please ignore the word "abruptly", it just throw exception but never stop the applicationDeepak Naik

1 Answers

3
votes

It works fine for me (note that Boot will auto-configure the error handler)...

@SpringBootApplication
public class So56728833Application {

    public static void main(String[] args) {
        SpringApplication.run(So56728833Application.class, args);
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<String, String> template) {
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3);
        eh.setClassifier( // retry for all except deserialization exceptions
                new BinaryExceptionClassifier(Collections.singletonList(DeserializationException.class), false));
        return eh;
    }

    @KafkaListener(id = "so56728833"
            + "", topics = "so56728833")
    public void listen(Foo in) {
        System.out.println(in);
        if (in.getBar().equals("baz")) {
            throw new IllegalStateException("Test retries");
        }
    }

    @KafkaListener(id = "so56728833dlt", topics = "so56728833.DLT")
    public void listenDLT(Object in) {
        System.out.println("Received from DLT: " + (in instanceof byte[] ? new String((byte[]) in) : in));
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so56728833").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic dlt() {
        return TopicBuilder.name("so56728833.DLT").partitions(1).replicas(1).build();
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.json.trusted.packages: com.example
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.value.default.type: com.example.So56728833Application$Foo
    producer:
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

logging:
  level:
    org.springframework.kafka: trace

I have 3 records in the topic:

"badJSON"
"{\"bar\":\"baz\"}"
"{\"bar\":\"qux\"}"

I see the first one going directly to the DLT, and the second one goes there after 3 attempts.