7
votes

I have created kafka consumers via new consumer API
I am using kafka 2.10-0.9.0.1
We have 1 consumer group with 1 consumer instance in each group

Kafka script 'kafka-consumer-groups.sh' provides way to delete user but that is applicable only for old consumer groups Running command :

bin/kafka-consumer-groups.sh

Gives

--delete : WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active.

So I want to ask is there any way to delete consumer group created via new consumer API ?

2

2 Answers

5
votes

There is no need to delete with the new consumer. Here is what the script outputs when attempting to delete:

Note that there's no need to delete group metadata for the new consumer as it is automatically deleted when the last member leaves

That's the short answer. More details: By "metadata", two things are meant. First, just the information about the consumers and consumer groups that is stored as part of the group membership coordinator. That is automatically removed if all consumers in the group are gone.

Second, the consumer group has stored the committed offsets in a Kafka topic (when the new consumer is used. Previously these used to be stored with Zookeeper). That topic is not immediately deleted when the consumer group disappears. If the consumer group reappears again, it will find the previous offsets automatically in this topic. It can choose to use them or ignore them. If the consumer group never reappears, these stored offsets are eventually garbage collected automatically.

So in a nutshell, there is no need to delete anything when using the new consumer.

0
votes

In Kafka 2.5.0, there is a Java API in the AdminClient called deleteConsumerGroups which can be used to delete individual ConsumerGroups.

You can use it as shown below:

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

public class DeleteConsumerGroups {
  public static void main(String[] args) {
    System.out.println("*** Starting AdminClient to delete a Consumer Group ***");

    final Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
    properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

    AdminClient adminClient = AdminClient.create(properties);
    String consumerGroupToBeDeleted = "console-consumer-65092";
    DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));

    KafkaFuture<Void> resultFuture = deleteConsumerGroupsResult.all();
    try {
      resultFuture.get();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }

    adminClient.close();
  }
}