4
votes

My kafka topic has 10 records/messages in total and 2 partitions having 5 messages each. My consumer group has 2 consumers and each of the consumer has already read 5 messages from their assigned partition respectively. Now, I want to re-process/read messages from my topic from start/beginning (offset 0).

I stopped my kafka consumers and ran following command to reset consumer group offset to 0.

./kafka-consumer-groups.sh --group cg1 --reset-offsets --to-offset 0 --topic t1 --execute --bootstrap-server "..."

My expectation was that once I restart my kafka consumers they will start reading records from offset 0 i.e. beginning, but that didn't happen and they polled from their last position i.e. offset 5. Why is that so? I then have to make each of my consumers, explicitly seek to offset 0 (beginning) to re-process/read records from the beginning. And in later tests cycles, I didn't even ran above command to reset offset for kafka consumer group.

My question is, if I have to make my consumers explicitly seek to beginning to make them re-process/read messages again, then what's the purpose of resetting the offset of kafka consumer group?

1
Most likely a consumer setting that is set to 5 (last 5 messages). I think that the server/broker setting is only used if a consumer didn’t specify a setting, but not totally sure. E.g. For new consumers that didn’t read any message yet.Gillsoft AB
Assuming the consumer belong to the specified consumer group, please try if --to-offset 0L works.A.Dev
Can you post the output of above command?Ajinkya

1 Answers

5
votes

Handling Kafka consumer offsets is bit more tricky. Consumer program uses auto.offset.reset config only when consumer group used does not have a valid offset committed in an internal Kafka topic.(Other supported offset storage is Zookeeper but internal Kafka topic is used as offset storage in latest Kafka versions).

Consider below scenarios:

  1. Consumer in consumer group named 'group1' has consumed 5 messages from topic 'testtopic' and offset details are committed to internal Kafka topic- Next time when the consumer starts, it will not use 'auto.offset.reset' config. Instead it will fetch the stored offset from storage and will continue fetch messages from the retrieved offset.

  2. Consumer in consumer group named 'group2' is started as a new consumer to fetch messages from 'testtopic'. This is new group and there is no offset details available in internal Kafka topic- 'auto.offset.reset' config is used now to decide where to start; either from beginning of the topic or from latest(only new messages will be consumed).

The issue as per your question is that the command to reset offset not working, you have to manually seek to beginning and start consumer.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> [--topic <topic_name> or --all-topics] --reset-offsets [--to-earliest or --to-offset <offset>] --execute

There are three possibilities for reset command not working.

  1. The log retention period is smaller and offset you are trying to reset is no longer available
  2. A consumer instance in the consumer group is running. In both cases, reset offset command may not work.
  3. Kafka version is <0.11. Reset offset API is available only from Kafka 0.11

From your question, first and third case is unlikely. Please check for second case. Stop any consumer instance running and then try resetting offsets.

Below command can be used to check whether a consumer group has active consumer instance.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe

Sample output:

Consumer group 'group1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
intro           0          0               99              99