0
votes

I'm trying to figure out how to deal with **transient errors** that occur in the message listener while **consuming messages** from a Kafka topic.
I'm using a **@KafkaListener** configured with a **custom errorHandler**. When the **listener throws** and Exception and the errorHandler considers it **transient**, the handler **seeks** the client to the **current offset** in order to re-fetch the message on next poll.

The listener receives the message in loop waiting for the transient issue to be resolved, as expected, but in some circumstances messages were lost.

I'm using spring-kafka 2.2.5, please see spring boot relevant configuration for Kafka

spring:
  kafka:
    bootstrap-servers: <HOST>:<PORT>
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      group-id: ConsumerGroup
      properties:
        max.poll.records: 1
    listener:
      ack-mode: record
      concurrency: 1

As far as I can see the application lose messages in the following cases:

  • An exception not initiated/handled by the message listener error handler is propagated to the KafkaMessageListenerContainer. The default value for the container property ackOnError is true so the container commits the offset. If the listener is restarted the message is not processed anymore by any other client in the consumer group as the offset has been already committed and moved forward;
  • No exception/error is propagated to container but the listener is restarted in the middle of the transient error processing loop described before. As in the previous case, clients in the consumer group will not reconsume the message as the offset has been committed by the container during the first poll.

So in order to fix the first issue it should be enough to set ackOnError to false. But for the second issue I started wondering if AckMode record is proper in this scenario because the message is committed in every case.

Should I move to MANUAL_IMMEDIATE in this scenario or am I missing the point here?

Many thanks, kind regards

1

1 Answers

0
votes

ackOnError has been false by default since version 2.3; it has been deprecated since 2.4 (in favor of GenericErrorHandler.isAckAfterHandle()) and is removed on master (the future 2.7).

as the offset has been committed by the container during the first poll.

I am not sure what you mean by that. If you are referring to the logic in the rebalance listener, if the listener is restarted you'll get a new consumer so it's position() will be the last committed offset.

Starting with version 2.3.6 you can completely disable that logic by setting the assignmentCommitOption to NEVER.

2.2.x is end of life (along with Boot 2.1) and will get no more releases. The last 2.2.x version was 2.2.14. 2.2.5 is nearly 2 years old; you should try to keep more up-to-date.

The latest release is 2.6.3.