I'm trying to figure out how to deal with **transient errors** that occur in the message listener while **consuming messages** from a Kafka topic.
I'm using a **@KafkaListener** configured with a **custom errorHandler**. When the **listener throws** and Exception and the errorHandler considers it **transient**, the handler **seeks** the client to the **current offset** in order to re-fetch the message on next poll.
The listener receives the message in loop waiting for the transient issue to be resolved, as expected, but in some circumstances messages were lost.
I'm using spring-kafka 2.2.5, please see spring boot relevant configuration for Kafka
spring:
kafka:
bootstrap-servers: <HOST>:<PORT>
consumer:
auto-offset-reset: earliest
enable-auto-commit: false
group-id: ConsumerGroup
properties:
max.poll.records: 1
listener:
ack-mode: record
concurrency: 1
As far as I can see the application lose messages in the following cases:
- An exception not initiated/handled by the message listener error handler is propagated to the KafkaMessageListenerContainer. The default value for the container property ackOnError is true so the container commits the offset. If the listener is restarted the message is not processed anymore by any other client in the consumer group as the offset has been already committed and moved forward;
- No exception/error is propagated to container but the listener is restarted in the middle of the transient error processing loop described before. As in the previous case, clients in the consumer group will not reconsume the message as the offset has been committed by the container during the first poll.
So in order to fix the first issue it should be enough to set ackOnError to false. But for the second issue I started wondering if AckMode record is proper in this scenario because the message is committed in every case.
Should I move to MANUAL_IMMEDIATE in this scenario or am I missing the point here?
Many thanks, kind regards