I am having a topic "oranges" with 10 partitions, 2 consumers within 1 consumer group. I am using Spring Kafka.
As for some reason, I need to re-read the data from time to time, I need to reset the offsets. My listener implement ConsumerSeekAware and in the onPartitionsAssigned() I simply call callback#seekToBeginning. This works fine as in the log I see messages from Kafka Client API (2.3.1) saying:
Resetting offset for partition oranges-X to offset 0. This happens for all partitions fine.
However, effectively only the last partition is reset (9) and from time to time, if I get lucky the second one (1) too. All others are not getting reset at all.
What is getting me real headaches is: if I omit partition 9 from the list of partitions to be reset, all other partitions get reset fine and everything works as expected.
The code is very simple:
class ... implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());
}
...
Logs:
19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.
enable.auto.commitset tofalse. If I change it totrue, it works as expected. Looks like there was some pending offset commits? The docu ofcallback#seekToBeginningsaysQueue a seekToBeginning operation to the consumer. The seek will occur after any pending offset commits. The consumer must be currently assigned the specified partition.- Martin LinhaonPartitionsAssignedis called on the consumer thread from thepoll()and the seeks are now done directly rather than queueing; the seeks are still queued if you save off the callback from inregistereSeekCallbackand call the callback from outside of theonPartitionsAssigned. - Gary Russell