I am having a topic "oranges" with 10 partitions, 2 consumers within 1 consumer group. I am using Spring Kafka.
As for some reason, I need to re-read the data from time to time, I need to reset the offsets. My listener implement ConsumerSeekAware
and in the onPartitionsAssigned()
I simply call callback#seekToBeginning
. This works fine as in the log I see messages from Kafka Client API (2.3.1) saying:
Resetting offset for partition oranges-X to offset 0
. This happens for all partitions fine.
However, effectively only the last partition is reset (9) and from time to time, if I get lucky the second one (1) too. All others are not getting reset at all.
What is getting me real headaches is: if I omit partition 9 from the list of partitions to be reset, all other partitions get reset fine and everything works as expected.
The code is very simple:
class ... implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());
}
...
Logs:
19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.
enable.auto.commit
set tofalse
. If I change it totrue
, it works as expected. Looks like there was some pending offset commits? The docu ofcallback#seekToBeginning
saysQueue a seekToBeginning operation to the consumer. The seek will occur after any pending offset commits. The consumer must be currently assigned the specified partition.
– Martin LinhaonPartitionsAssigned
is called on the consumer thread from thepoll()
and the seeks are now done directly rather than queueing; the seeks are still queued if you save off the callback from inregistereSeekCallback
and call the callback from outside of theonPartitionsAssigned
. – Gary Russell