1
votes

I have multiple instances of my spring boot app consuming from a kafka topic. Since I want all instances to get data from all partitions of this topic, I assigned different consumers groups for each instances which would be created dynamically when starting this application.

@Configuration
@EnableKafka
public class KafkaStreamConfig {
@Bean("provisioningStreamsBuilderFactoryBean")
  public StreamsBuilderFactoryBean myStreamsBuilderFactoryBean() {
    String myCGName = "MY-CG-" + UUID.randomUUID().toString();
    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(APPLICATION_ID_CONFIG, myCGName); // setting consumer group name
    // setting other props
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
    streamsBuilderFactoryBean.setStreamsConfiguration(streamsConfiguration);
    return streamsBuilderFactoryBean;
  }
}

So every time an instance restarts or a new instance is created, a new consumer group is created. And this's the consumer which reads from my topic.

@Component
public class MyConsumer {
    @Autowired
    private StreamsBuilder streamsBuilder;

    @PostConstruct
    public void consume() {
      final KStream<String, GenericRecord> events = streamsBuilder.stream("my-topic");
      events
      .selectKey((key, record) -> record.get("id").toString())
      .foreach((id, record) -> {
          // some computations with the events consumed
      });
    }
}

Now because of these dynamically created consumer groups stay on, and since they're not used in my application once an instance restarts, these don't consume messages anymore and show a lot of lag and hence give rise to false alerts.

So I'd like to delete these consumer groups when the application shuts down with Kafka's AdminClient api. I was thinking of trying to delete it in a shutdown hook like in a method annotated with @PreDestroy inside MyConsumer class like this:

@PreDestroy
public void destroyMYCG() {
  try (AdminClient admin = KafkaAdminClient.create(properties)) {
    DeleteConsumerGroupsResult deleteConsumerGroupsResult = admin.deleteConsumerGroups(Collections.singletonList(provGroupName));
    KafkaFuture<Void> future =  deleteConsumerGroupsResult.all();
    future.whenComplete((aVoid, throwable) -> {
      System.out.println("EXCEPTION :: " + ExceptionUtils.getStackTrace(throwable));
    });
  }
  System.out.println(getClass().getCanonicalName() + " :: DESTROYING :: " + provGroupName);
}

but I'm getting this exception if I tried that and consumer groups still shows up in the list of consumer groups:

org.apache.kafka.common.errors.TimeoutException: The AdminClient thread is not accepting new calls.

Can someone please help me with this?

2

2 Answers

0
votes

Using UUID as the consumer goup name is terrible.You can definition a final str as consumer goup name for each spring boot app.

0
votes
  1. IMHO this is logical mistake to create consumer group with UUID. Logically if the same process restarts, it is the same app - the same consumer. You will solve your problem giving good consumer groups names related to what logically do the app.
  2. I would delete consumer groups on the server side, having "GC" set on certain level of lag.

Again consumer group is not application id. It is not intended to be randomly created. And honestly spoken I not sure what kind of problem do you solve doing this. Because in fact by saying that consumer group is random, you say my code is doing random things and I have no clue what happens in message processing. We have very complex Kafka message processing and always there is better or worse name for the process, but at least exist one, which is not random.