1
votes

Spring-Kafka: While pausing/resuming consumer using pause/resume method as per documentation, rebalance should not occur when automatic assignment is used but it is not working, rebalancing happening. How to pause/resume consumer and keeping polling after a period without rebalancing?

Use Case: Consumer should pause for a period and keep polling to give heartbeat and resume after time is up but Kafka should not rebalance while consumer is paused.

            System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] stopped consumption.");
            consumer.pause(Collections.singleton(topicPartition));                    
            try {
                    Thread.sleep(60000);
                    consumer.resume(Collections.singleton(topicPartition));
                    System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] resumed consumption.");
            } catch (InterruptedException e) {              
                       e.printStackTrace();
            }

Logs: 2019-02-19 15:19:49.173 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] (Re-)joining group 2019-02-19 15:19:49.175 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] (Re-)joining group 2019-02-19 15:19:49.181 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=customer] (Re-)joining group

2019-02-19 15:19:49.192 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] Successfully joined group with generation 581 2019-02-19 15:19:49.192 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] Successfully joined group with generation 581

2019-02-19 15:19:49.194 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:49.194 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:49.218 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:49.219 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:49.223 INFO 82272 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '' 2019-02-19 15:19:49.233 INFO 82272 --- [ main] c.g.s.S.SpringKafkaSupportApplication : Started SpringKafkaSupportApplication in 3.43 seconds (JVM running for 3.85) Consumer[customerTaskExecutor-1] received message[Customer(name=, phoneNumber=20)] Consumer[customerTaskExecutor-2] received message[Customer(name=test 6, phoneNumber=6)] Consumer[customerTaskExecutor-1] Partition [spring-kafka-topic-2] stopped consumption. Consumer[customerTaskExecutor-1] Partition [spring-kafka-topic-1] stopped consumption. 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] Attempt to heartbeat failed since group is rebalancing 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] Attempt to heartbeat failed since group is rebalancing 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] Revoking previously assigned partitions [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] Revoking previously assigned partitions [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] (Re-)joining group 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] (Re-)joining group 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] Successfully joined group with generation 582 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] Successfully joined group with generation 582 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=customer] Successfully joined group with generation 582 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15:19:52.210 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.210 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] Setting newly assigned partitions [spring-kafka-topic-2, spring-kafka-topic-3] 2019-02-19 15:19:52.211 INFO 82272 --- [rTaskExecutor-3] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15:19:52.212 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.212 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [spring-kafka-topic-2, spring-kafka-topic-3] Consumer[customerTaskExecutor-3] received message[Customer(name=test 6, phoneNumber=6)]

2

2 Answers

2
votes

Read the Kafka documentation.

Pausing the consumer simply means that subsequent poll()s will return no records until you call resume(), but you still have to call poll() within max.poll.interval.ms in order to prevent a rebalance.

0
votes

Just had same "group" of messages with consumers and Spring Kafka. Same results with @KafkaListener and non-annotated Spring with ConcurrentMessageListenerContainer. Parameter adjustment does not work exactly the same as straight Java.

Re-wrote with straight Java using consumer.poll() and started threads with ExecutorService - adjusted parameters per Gary Russell and everything works properly. No longer get these messages and lost heartbeat during re-balancing. Straight java examples from Clouderable and Confluent websites:

http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html

https://docs.confluent.io/current/clients/consumer.html#