Spring-Kafka: While pausing/resuming consumer using pause/resume method as per documentation, rebalance should not occur when automatic assignment is used but it is not working, rebalancing happening. How to pause/resume consumer and keeping polling after a period without rebalancing?
Use Case: Consumer should pause for a period and keep polling to give heartbeat and resume after time is up but Kafka should not rebalance while consumer is paused.
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] stopped consumption.");
consumer.pause(Collections.singleton(topicPartition));
try {
Thread.sleep(60000);
consumer.resume(Collections.singleton(topicPartition));
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] resumed consumption.");
} catch (InterruptedException e) {
e.printStackTrace();
}
Logs: 2019-02-19 15:19:49.173 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] (Re-)joining group 2019-02-19 15:19:49.175 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] (Re-)joining group 2019-02-19 15:19:49.181 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=customer] (Re-)joining group
2019-02-19 15:19:49.192 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] Successfully joined group with generation 581 2019-02-19 15:19:49.192 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] Successfully joined group with generation 581
2019-02-19 15:19:49.194 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:49.194 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:49.218 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:49.219 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:49.223 INFO 82272 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2019-02-19 15:19:49.233 INFO 82272 --- [ main] c.g.s.S.SpringKafkaSupportApplication : Started SpringKafkaSupportApplication in 3.43 seconds (JVM running for 3.85) Consumer[customerTaskExecutor-1] received message[Customer(name=, phoneNumber=20)] Consumer[customerTaskExecutor-2] received message[Customer(name=test 6, phoneNumber=6)] Consumer[customerTaskExecutor-1] Partition [spring-kafka-topic-2] stopped consumption. Consumer[customerTaskExecutor-1] Partition [spring-kafka-topic-1] stopped consumption. 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] Attempt to heartbeat failed since group is rebalancing 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] Attempt to heartbeat failed since group is rebalancing 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] Revoking previously assigned partitions [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] Revoking previously assigned partitions [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] (Re-)joining group 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] (Re-)joining group 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] Successfully joined group with generation 582 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] Successfully joined group with generation 582 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=customer] Successfully joined group with generation 582 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15:19:52.210 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.210 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-2, spring-kafka-topic-3] 2019-02-19 15:19:52.211 INFO 82272 --- [rTaskExecutor-3] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15:19:52.212 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.212 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-2, spring-kafka-topic-3] Consumer[customerTaskExecutor-3] received message[Customer(name=test 6, phoneNumber=6)]