I have multiple instances of my spring boot app consuming from a kafka topic. Since I want all instances to get data from all partitions of this topic, I assigned different consumers groups for each instances which would be created dynamically when starting this application.
@Configuration
@EnableKafka
public class KafkaStreamConfig {
@Bean("provisioningStreamsBuilderFactoryBean")
public StreamsBuilderFactoryBean myStreamsBuilderFactoryBean() {
String myCGName = "MY-CG-" + UUID.randomUUID().toString();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(APPLICATION_ID_CONFIG, myCGName); // setting consumer group name
// setting other props
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
streamsBuilderFactoryBean.setStreamsConfiguration(streamsConfiguration);
return streamsBuilderFactoryBean;
}
}
So every time an instance restarts or a new instance is created, a new consumer group is created. And this's the consumer which reads from my topic.
@Component
public class MyConsumer {
@Autowired
private StreamsBuilder streamsBuilder;
@PostConstruct
public void consume() {
final KStream<String, GenericRecord> events = streamsBuilder.stream("my-topic");
events
.selectKey((key, record) -> record.get("id").toString())
.foreach((id, record) -> {
// some computations with the events consumed
});
}
}
Now because of these dynamically created consumer groups stay on, and since they're not used in my application once an instance restarts, these don't consume messages anymore and show a lot of lag and hence give rise to false alerts.
So I'd like to delete these consumer groups when the application shuts down with Kafka's AdminClient api. I was thinking of trying to delete it in a shutdown hook like in a method annotated with @PreDestroy inside MyConsumer class like this:
@PreDestroy
public void destroyMYCG() {
try (AdminClient admin = KafkaAdminClient.create(properties)) {
DeleteConsumerGroupsResult deleteConsumerGroupsResult = admin.deleteConsumerGroups(Collections.singletonList(provGroupName));
KafkaFuture<Void> future = deleteConsumerGroupsResult.all();
future.whenComplete((aVoid, throwable) -> {
System.out.println("EXCEPTION :: " + ExceptionUtils.getStackTrace(throwable));
});
}
System.out.println(getClass().getCanonicalName() + " :: DESTROYING :: " + provGroupName);
}
but I'm getting this exception if I tried that and consumer groups still shows up in the list of consumer groups:
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread is not accepting new calls.
Can someone please help me with this?