4
votes

Let’s assume enable.auto.commit=true, and let’s assume that the topic I read message from has some long period of inactivities (no messages for let’s say 48h). As a result successive poll() call won’t return any message for 48h, my question is:

Will the last returned message’s offset (same one for 48h) be committed again and again each auto.commit.interval.ms in the __consumer_offsets topic, which is compacted and whose expiration is controlled by offsets.retention.minutes ?

Committing again and again would prevent the record in __consumer_offsets topic to expire and be deleted at some point.

1

1 Answers

4
votes

This is an interesting one.

Edit: Based on the recent comment, updating this. The updated portions are strikedthrough and marked either explicitly or in italics.

I would go with "No" "Yes" i.e. the last returned message's offset will NOT be committed again and again if no new messages have arrived in the topic.

Here's an explanation on the same.

A typical consumer example would look something like this:

Properties props = new Properties();
<other-properties>
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

So, typically the responsibility of the offset commit sits with the consumer and is driven by the poll loop.

Now, in the scenario you've described after the last commit every call to the poll() method will return an empty map. So, if there are no records returned by poll() then there are no new offsets to be committed.

Here's how I traced through the source code of Kafka and arrived at this conclusion. The following return statement is from the poll() method definition given here

return ConsumerRecords.empty();

Definition of the empty() method available in this file.

Edit: The following portion is the new addition based on the comment from Gwen.

However, before the empty map is returned there is another poll() method (sitting in the ConsumerCoordinator class) is invoked via the poll() method of the KafkaConsumer class, which as per the definition given here. handles periodic offset commits if they are enabled via the following method:

public void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
        this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
        doAutoCommitOffsetsAsync();
    }
}

I hope this helps!