I had a Kafka consumer defined with the following properties :
session.timeout.ms = 60000
heartbeat.interval.ms = 6000
We noticed a lag of ~2000 messages and saw that the same message is being consumed multiple times by the consumer (via our app logs). Also, noticed that some of the messages were taking ~10 seconds to be completely processed. Our suspicion was that the consumer was not committing the offset properly (or was committing the same old offset repeatedly), because of which the same message was being picked up by the consumer.
To fix this, we introduced a few more properties :
auto.commit.interval.ms=20000 //To ensure that commit is happening only after processing of message is completed
max.poll.records=10 //To make the consumer pick only 10 messages in one go
And, we set the concurrency to 1.
This fixed our issue. The lag started to reduce and ultimately came to 0.
But, I am still unclear why the problem occurred in the first place. As I understand, by default :
enable.auto.commit = true
auto.commit.interval.ms=5000
So, ideally the consumer should have been committing every 5 seconds. If the message was not completely processed within this timeframe, what happens? What offset is being committed by the consumer? Did the problem occur due to large poll record size (which is 500 by default)
Also, about the poll() method, I read that :
The poll() call is issued in the background at the set auto.commit.interval.ms.
So, originally if the poll() was earlier taking place in every 5 seconds (default auto.commit.interval), why was not it committing the latest offset? Because the consumer was still not done processing it? Then, it should have committed that offset at the next 5th second.
Can someone please answer these queries and explain why the original problem occurred?