0
votes

I was wondering how to tell kafka consumer when Deserialization of an object fails to drop the record from kafka, log the error and continue listening to other incoming messages.

I've got my consumer configured as below: ConsumerProperties:

Map<String, Object> consumerProperties = new HashMap<>();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaPropertiesConfiguration.getBootstrapServers());
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());

    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            NullDeserializer.class.getName());

    consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
            kafkaTelecontrolEventConsumerProperties.getClientIdConfig());
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
            kafkaTelecontrolEventConsumerProperties.getGroupIdConfig());

    return new DefaultKafkaConsumerFactory<>(consumerProperties); 

NullDeserializer.class is my deserializer used for testing:

@Log4j2
public class NullDeserializer implements Deserializer<Object> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        try {
            TeleControl.Event.parseFrom(data) ;
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
        return null;
    }

    @Override
    public void close() {

    }
}

IntegrationFlow:

return IntegrationFlows
                .from(Kafka
                        .messageDrivenChannelAdapter(telecontrolEventConsumerFactory,
                                eventConsumerProperties.getTelecontrolEventTopic())
                )
                .handle(System.err::println)
                .get();

My main problem is when new RuntimeException is thrown when parsing fails, how to tell kafkaConsumer log the error, drop the record and continue processing next kafka message.

1

1 Answers

1
votes

See the ErrorHandlingDeserializer introduced in 2.2.

The upcoming 2.2.1 release (due tomorrow) replaces it with ErrorHandlingDeserializer2 which is type safe.

In both cases, the deserialization exception is passed to the listener container which, in turn, passes it directly to the ErrorHandler instead of calling the listener.