3
votes

I had a Kafka consumer defined with the following properties :

session.timeout.ms = 60000
heartbeat.interval.ms = 6000

We noticed a lag of ~2000 messages and saw that the same message is being consumed multiple times by the consumer (via our app logs). Also, noticed that some of the messages were taking ~10 seconds to be completely processed. Our suspicion was that the consumer was not committing the offset properly (or was committing the same old offset repeatedly), because of which the same message was being picked up by the consumer.

To fix this, we introduced a few more properties :

auto.commit.interval.ms=20000 //To ensure that commit is happening only after processing of message is completed
max.poll.records=10 //To make the consumer pick only 10 messages in one go

And, we set the concurrency to 1.

This fixed our issue. The lag started to reduce and ultimately came to 0.

But, I am still unclear why the problem occurred in the first place. As I understand, by default :

enable.auto.commit = true
auto.commit.interval.ms=5000

So, ideally the consumer should have been committing every 5 seconds. If the message was not completely processed within this timeframe, what happens? What offset is being committed by the consumer? Did the problem occur due to large poll record size (which is 500 by default)

Also, about the poll() method, I read that :

The poll() call is issued in the background at the set auto.commit.interval.ms.

So, originally if the poll() was earlier taking place in every 5 seconds (default auto.commit.interval), why was not it committing the latest offset? Because the consumer was still not done processing it? Then, it should have committed that offset at the next 5th second.

Can someone please answer these queries and explain why the original problem occurred?

1

1 Answers

5
votes

If you are using Spring for Apache Kafka, we recommend setting enable.auto.commit to false so that the container will commit the offsets in a more deterministic fashion (either after each record, or each batch of records - the default).

Most likely, the problem was max.poll.interval.ms which is 5 minutes by default. If your batch of messages take longer than this you would have seen that behavior. You can either increase max.poll.interval.ms or, as you have done, reduce max.poll.records.

The key is that you MUST process the records returned by the poll in less than max.poll.interval.ms.

Also, about the poll() method, I read that :

The poll() call is issued in the background at the set auto.commit.interval.ms.

That is incorrect; poll() is NOT called in the background; heartbeats are sent in the background since KIP-62.