I was wondering how to tell kafka consumer when Deserialization of an object fails to drop the record from kafka, log the error and continue listening to other incoming messages.
I've got my consumer configured as below: ConsumerProperties:
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaPropertiesConfiguration.getBootstrapServers());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NullDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
kafkaTelecontrolEventConsumerProperties.getClientIdConfig());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaTelecontrolEventConsumerProperties.getGroupIdConfig());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
NullDeserializer.class is my deserializer used for testing:
@Log4j2
public class NullDeserializer implements Deserializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public Object deserialize(String topic, byte[] data) {
try {
TeleControl.Event.parseFrom(data) ;
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
return null;
}
@Override
public void close() {
}
}
IntegrationFlow:
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(telecontrolEventConsumerFactory,
eventConsumerProperties.getTelecontrolEventTopic())
)
.handle(System.err::println)
.get();
My main problem is when new RuntimeException is thrown when parsing fails, how to tell kafkaConsumer log the error, drop the record and continue processing next kafka message.