I just want to understand kafka rebalance.This is my listener method. I have configured RetryTemplate with consumer factory to retry 20 times with a backoff delay of 20secs. I'm using spring-kafka 1.2.2(we are planning to upgrade client) and using manual acks.
@KafkaListener(id = "${kafka.listener-id}", topics = "${kafka.topic}")
public void listen(final ConsumerRecord<String, String> consumerRecord,
final Acknowledgment acknowledgment) throws ServiceResponseException {}
if (true){
System.out.println("throwing exception ");
throw new RuntimeException();
}
try {
acknowledgment.acknowledge();
LOGGER.info("Kafka acknowledgment sent for Transaction ID:");
} catch (Exception e) {
LOGGER.info("Exception encountered when acking record with transaction id: {}");
}
}
I have 2 workers that has concurrency 2 each. On kafka I have 3 partitions. I started one worker and 3 partitions are assigned to worker1. And then I sent a message. And a RuntimeException is thrown in listener and this happens for 20 times with 20 sec delay. Then when I started worker2 kafka rebalance triggers but partitions are not yet assigned. worker1 fails with message "Error while processing: ConsumerRecord"(after getContainerProperties().getShutdownTimeout())and and then all consumers join the group. And now the same message is delivered to worker2.
1) And this is working as I needed it to work. But I have a question, when a rebalance triggers why is partition assignment not happening immediately instead it waits for the worker1 to stop completely(waiting for getContainerProperties().getShutdownTimeout()) and then all consumers from worker1 and worker2 joins groups.
2) And during rebalance I observed that consumers stop calling poll(from logs below). Is it true?
Trace logs from worker 1:
2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-2-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.384 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.384 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:53.977 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.977 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.008 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:54.008 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.023 INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [messages-0] for group mris-group
2018-09-23 13:52:54.023 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.023 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.081 INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [messages-1] for group mris-group
2018-09-23 13:52:54.081 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.081 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.241 INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [messages-2] for group mris-group
2018-09-23 13:52:54.241 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.241 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264 INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[messages-2]
2018-09-23 13:52:54.264 INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[messages-0]
2018-09-23 13:52:54.264 INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group mris-group
2018-09-23 13:52:54.265 INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group mris-group
2018-09-23 13:53:09.355 DEBUG 6384 --- [ listener-1-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Interrupting invoker
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:53:24.085 INFO 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker timed out while waiting for shutdown and will be canceled.
2018-09-23 13:53:24.085 INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[messages-1]
2018-09-23 13:53:24.085 INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group mris-group
2018-09-23 13:53:24.101 ERROR 6384 --- [ listener-1-L-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = [email protected], value = <removed>])
org.springframework.retry.backoff.BackOffInterruptedException: Thread interrupted while sleeping; nested exception is java.lang.InterruptedException: sleep interrupted
at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:86) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.retry.backoff.StatelessBackOffPolicy.backOff(StatelessBackOffPolicy.java:36) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:305) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:55) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:34) ~[spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.2.2.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) [na:1.8.0_162]
at org.springframework.retry.backoff.ThreadWaitSleeper.sleep(ThreadWaitSleeper.java:30) ~[spring-retry-1.2.0.RELEASE.jar:na]
at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:83) ~[spring-retry-1.2.0.RELEASE.jar:na]
... 14 common frames omitted
2018-09-23 13:53:24.101 INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.101 INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102 INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102 INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [messages-0] for group mris-group
2018-09-23 13:53:24.102 INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [messages-2] for group mris-group
2018-09-23 13:53:24.102 INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.103 INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-0=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.104 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-2=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.106 INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[messages-0]
2018-09-23 13:53:24.107 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.107 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.108 INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[messages-2]
2018-09-23 13:53:24.108 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.108 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.207 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.207 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:25.111 TRACE 6384 --- [ listener-0-L-2] essageListenerContainer$ListenerConsumer : No records to process
Trace logs from worker2:
2018-09-23 13:53:24.102 INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.104 INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.105 INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105 INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.105 INFO 6401 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2018-09-23 13:53:24.105 INFO 6401 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2018-09-23 13:53:24.106 INFO 6401 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [messages-1] for group mris-group
2018-09-23 13:53:24.111 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-1=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.115 INFO 6401 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[messages-1]
2018-09-23 13:53:24.118 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.118 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.189 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-09-23 13:53:24.189 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.202 TRACE 6401 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : Processing ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = [email protected], value = <removed>)
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 TRACE 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.210 TRACE 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.216 DEBUG 6401 --- [ listener-0-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception
2018-09-23 13:53:25.194 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:25.194 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...