0
votes

I'd like a batch listener that commits offsets prior to a record that fails, logs the failed record, and then retrieves a new batch starting with the first offset after the failed record.

My current approach handles exceptions thrown in my listener's code, by throwing a BatchListenerFailedException that is handled by the RecoveringBatchErrorHandler as I intend. However, I would like to handle all exceptions in this way; that is, an exception thrown by the listener and any exception due to a deserialization failure. I'm using a BatchMessagingMessageConverter. I understand that I could use an ErrorHandlingDeserializer if the deserialization exception occurred in the Kafka Deserializer; however, deserialization exceptions occur with my configuration in the MessagingMessageConverter, which I believe is after the Kafka client BytesDeserializer has successfully deserialized my message. How can I best achieve my goal?

Here's my container factory config:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
         ConsumerFactory<Object, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
    );
    factory.setBatchErrorHandler(errorHandler);
    BatchMessagingMessageConverter messageConverter = new BatchMessagingMessageConverter(new BytesJsonMessageConverter());
    factory.setMessageConverter(messageConverter);
    factory.setConcurrency(1);
    return factory;
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "pojo-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(props);
}

And here is my Listener:

@KafkaListener(id = "myKafkaListener", idIsGroup = false, autoStartup = "true", topics = {"pojo-topic"}, containerFactory = "kafkaListenerContainerFactory")
public void receive(List<Message<Pojo>> messages) {
    System.out.println("received " + messages.size() + " messages");
    int i = 0;
    try {
        //exceptions thrown here are handled as I intend
        for (var mm : messages) {
            var m = mm.getPayload();
            System.out.println("received: " + m + " at offset " + mm.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
            i++;
        }
    } catch (Exception e) {
        throw new BatchListenerFailedException("listener threw exception when processing batch", e, i);
    }
}

UPDATE

Here is the stack trace from when I send a string (just "A") instead of a json object, and deserialization fails:

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:79) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2015) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1859) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1725) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1704) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2376) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2008) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1978) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1930) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1842) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 8 common frames omitted
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:122) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:174) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:322) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:153) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 11 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3643) ~[jackson-databind-2.12.4.jar:2.12.4]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:119) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 16 common frames omitted
1
Which converter is performing the deserialization? Can you show the full stack trace? I think you would need a custom message converter to wrap the conversion exception in a BatchListenerFailedException.Gary Russell
On second thought, that won’t work. The previous offsets will be committed, which is not what we want. We need to do it similarly to the error handling deserializer. Set a special payload or header to tell the listener that deserialization failed. You should be able to re use the code in the EHD. I can take a look tomorrow if it is not clear.Gary Russell
After more thoughts, I think handling this situation is going to be more involved, it will need changes in the listener adapter. Is there some specific reason you are doing conversion at the listener converter level instead of in the deserializer?Gary Russell
In at least one of my use cases, doing the conversion in the deserializer would work. Thanks for pointing this out. I also have a Spring Integration listener (similar to docs.spring.io/spring-integration/docs/current/reference/html/…) which currently uses KafkaMessageDrivenChannelAdapter.ListenerMode.batch, ContainerProperties.AckMode.BATCH and a BatchMessagingMessageConverter using a BytesJsonMessageConverter. Would your deserializer solution (Example 1) also work in that scenario assuming I configure my ConsumerFactory correctly?Chad Showalter
Yes, it would work there too.Gary Russell

1 Answers

2
votes

Here are two solutions; the first uses the ErrorHandlingDeserializer and JsonDeserializer. The second is a work-around, and uses the ByteArrayJsonDeserializer I have opened an issue to provide a more seamless solution in the batch listener adapter.

Example 1, using deserializer:

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.listener.type=batch
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo == null
                    && headers.get(i).get(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {

                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT",
            properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
                ":org.apache.kafka.common.serialization.StringDeserializer")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    BatchErrorHandler eh(ProducerFactory<String, byte[]> pf) {
        KafkaTemplate<String, byte[]> template = new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}");
            template.send("so69055510", "JUNK");
            template.send("so69055510", "{\"bar\":\"qux\"}");
        };
    }

}
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]

Example 2, using a custom message converter. Note that, for this work around, you need some way to indicate in your domain object that deserialization failed:

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo.getBar().equals("thisIsABadOne")) {
                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    ByteArrayJsonMessageConverter converter() {
        return new ByteArrayJsonMessageConverter() {

            @Override
            public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer, Type type) {

                try {
                    return super.toMessage(record, acknowledgment, consumer, Foo.class); // <<<<<< type
                }
                catch (ConversionException ex) {
                    return MessageBuilder.withPayload(new Foo("thisIsABadOne"))
                            .build();
                }
            }

        };
    }

    @Bean
    BatchErrorHandler eh(KafkaTemplate<String, byte[]> template) {
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, byte[]> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}".getBytes());
            template.send("so69055510", "JUNK".getBytes());
            template.send("so69055510", "{\"bar\":\"qux\"}".getBytes());
        };
    }

}
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]