4
votes

I am currently working on fetching messages from topics with a specific offset. I am using seek() to achieve it. But when I am setting enable.auto.commit to true or using a manual sync (commitSync()/commitAsync()), Seek() does not work, as it did not poll the messages from the specific offset rather picks from the last committed offset.

So when using Seek() is it mandatory store the offsets in an external DB and not commit to Kafka ? Both Seek and Commit will not work in parallel?

Client Version - kafka-clients - 2.4.0

Thanks!!

1
Could you please share your code and consumer configs?H.Ç.T
Q1: No. Q2: They work together. Please share what you have done (code & configs), so we can help you betterAshish Bhosle
@H.Ç.T & @Ashish Bhosle, Apologies. There was a mistake in the code i`ve written. Seek() is working when I am using auto/manual commits.kavin

1 Answers

5
votes

When you commit (either auto or manual makes little difference) you are storing at the broker end a record of how far in a partition a consumer has reached. This committed offset is only ever used in the event of a rebalance, so that when a consumer is assigned that partition they can pick up from a point where all previous messages are known to have been processed. This provides a guarantee that as long as consumers are coded correctly messages will not be lost on consumption in the event of changes in group membership, when messages are being processed sequentially.

When the group membership is stable then committed offset does nothing. Each consumer has its own in-memory offset that it maintains and is used each time it fetches a batch of records from the broker. By default this offset increases sequentially. The seek method only changes this in-memory offset so that the next poll will fetch from whatever arbitrary offset you have specified, unless it doesn't exist in which case an Exception will be thrown.

If you are storing commit offsets externally then seek may be used after a rebalance to retrieve the externally stored offsets and fetch from there but in that case you would have to call seek in a RebalanceListener - if you call seek before poll it will have no effect as the consumer only finds out about the rebalance and new partition assignment during the poll method, and so without intervening during poll it will consume from the last committed offset.

This slightly unintuitive situation also arises when you pause consumers, something I wrote about at https://chrisg23.blogspot.com/2020/02/why-is-pausing-kafka-consumer-so.html?m=1