I read spring-kafka/kafka documentation back and forth, and still cannot find a way, how to do proper transactional behavior with error recovering. I believe this is not trivial question, so please read until end. I believe whole this question revolves around finding way how to reposition over failing record or how to ack in error handler. But mabye there are better ways, I don't know.
So records are flowing in, and some of them are invalid. What I would like to have as a minimal solution is(in which I will then fix sevaral problems you probably see as well):
1) we cannot afford the luxury of stopping the production in case of some trivial mishap, like one or few invalid records. Thus if there is invalid record in kafka topic, I would like to log it, or resend it to different queue, but then proceed with processing following records.
2) there are permanent and temporary failures. Permanent failure is record unable to deserialize, record failing data validation. In this case, I'd like to skip the invalid record, as discussed in 1). Temporary failure might be some specific exception or state, like for example database connection errors, network issues etc. In this case, we do not want to skip failing record, we want to retry, after some delay.
Subject of this question is ONLY implementing skip/don't skip behavior.
Lets say, that this is our starting point:
private Map<String, Object> createKafkaConsumerFactoryProperties(String bootstrapServers, String groupId, Class<?> valueDeserializerClass) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean(name="SomeFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
@Value("${…}") String bootstrapServers,
@Value("${…}") String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
createKafkaConsumerFactoryProperties(bootstrapServers, groupId, AvroDeserializer.class),
new StringDeserializer(),
new AvroDeserializer(SomeClass.class));
factory.setConsumerFactory(consumerFactory);
// factory.setConcurrency(2);
// factory.setBatchListener(true);
return factory;
}
and we have listener like:
@KafkaListener(topics = "${…}", containerFactory = "SomeFactory")
public void receive(@Valid List<SomeClass> messageList) {/*logic*/}
Now how this behave if I understand correctly:
when listener gets message, ~when we reach inside of receive method, the kafka message will be already acked, and if receive method throw an exception, the next poll will return following record. Because ack happened, and we do not have error handler defined, thus logging error handler will kick in. This is not necessarily what we want. We can use SeekToCurrentErrorHandler to reprocess the message. Or one can specify TransactionManager, and if exception 'leaks' from listener, repositioning will also happen. If someone know performance comparison of these two approaches, please tell me.
when message cannot be deserialized, deserializer will fail, message will not be acked and same record will be polled again. This is some sort of "poison packet" since kafka will spin on this message indefinitelly. We do have retry.backoff.ms to at least slow it down, but I can't see any max number retries or something. So the best thing we can do is to stop/pause container in this situation. Which is way to harsh. Btw. I'm new to kafka/spring-kafka, I did not see anywhere mention, how to manually reposition offset from outside of an application, meaning OK, listener is down, but now what? Another solution would be not to fail deserializer, and return something. But what?? KafkaNull, great, but then our listener will fail because SomeClass ClassCastException. We can send some artificial value of SomeClass, which is again horrible, because this is not a data what we actually get. Also this is architectonically incorrect.
or we can use repositioning error handler, which would be great, well if we know how to do that. I need to seek to next record. But while documentation says, that ErrorHandler should communicate which record caused the failure, it seems that it fails to do so. So even in non-batch listener I have list of records(1 failed + bunch of unprocessed), and have no idea where set offset to.
So what is the solution to this madness? Well the best I can come up with right now is pretty ugly: do not fail in deserializer (bad), do not accept specific type in listener (bad), filter out KafkaNulls manually (bad) and finally trigger bean validation manually (bad). Is there a better way? Thanks for examplantion, I'd be grateful for every hint or direction given how to achieve this.