20
votes

I want to delete a Kakfa consumer group so that when the application creates a consumer and subscribes to a topic it can start at the beginning of the topic data.

This is with a single node development vm using the current latest Confluent Platform 3.1.2 which uses Kafka 0.10.1.1.

I try the normal syntax:

sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --delete --group my_consumer_group

I get the error:

Option [delete] is only valid with [zookeeper]. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires.

If I try the zookeeper variant:

sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --delete --group my_consumer_group

I get:

Delete for group my_consumer_group failed because group does not exist.

If I list using the "old" consumer, I do not see my consumer group (or any other consumer groups)

sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --list

If I list using the "new" consumer, I can see my consumer group but apparently I can't delete it:

sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --list
5
there is no solution for this out-of-the-boxNatalia
The shell can only delete ZK-based consumer groups. For groups of new consumer, you don't need to delete the groups since Kafka removes expired groups automatically. If you do want to reset the offsets for such groups, set offsets.retention.minutes to a smaller value.amethystic
seekToBeginning is only offered in the Java API. It is definitely not in the Python API. And even in the Java API, it is less than ideal. Kafka 0.10.2 solves my underlying scenario in a cleaner way.clay

5 Answers

22
votes

This can be done with Kafka 1.1.x. From the documentation:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

9
votes

In Kafka 0.11 (or Confluent 3.3) you can reset the offsets of any existing consumer group without having to delete the topic. In fact you can change the offsets to any absolute offset value or timestamp or any relative position as well.

These new functions are all added with the new --reset-offsets flag on the kafka-consumer-groups command line tool.

See KIP-122 details here https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

4
votes

If you use Java client, you can first get the beginning offset.

TopicPartition partition = new TopicPartition("YOUR_TOPIC", YOUR_PARTITION);
Map<TopicPartition, Long> map = consumer.beginningOffsets(Collections.singleton(partition));

And the offset that consumer using to start processing, (if not delete the consumer group).

Long committedOffset = consumer.committed(partition).offset();

Now, if you think start from committedOffset is ok, just poll records. if you want the beginning offset, consumer.seek(partition, map.get(partition));

3
votes

Upgrading to the just released Confluent Platform 3.2 with Kafka 0.10.2 solved my underlying issue. When I delete a topic, offset information is now correctly reset. So when I create a topic with the same name, consumers start from the beginning of the new data.

I still can't delete new style consumer groups with the kafka-consumer-groups tool, but my underlying issue is solved.

Before Kafka 0.10.2, there were hacks, but no clean solution to this issue.

0
votes

you can also reset the offset of single topic without deleting the entire consumer group :

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --topic my-topic --reset-offsets --to-earliest --execute