0
votes

I just want to understand kafka rebalance.This is my listener method. I have configured RetryTemplate with consumer factory to retry 20 times with a backoff delay of 20secs. I'm using spring-kafka 1.2.2(we are planning to upgrade client) and using manual acks.

@KafkaListener(id = "${kafka.listener-id}", topics = "${kafka.topic}")
public void listen(final ConsumerRecord<String, String> consumerRecord,
                   final Acknowledgment acknowledgment) throws ServiceResponseException {}

    if (true){
        System.out.println("throwing exception ");
        throw new RuntimeException();
    }

    try {
        acknowledgment.acknowledge();
        LOGGER.info("Kafka acknowledgment sent for Transaction ID:");
    } catch (Exception e) {
        LOGGER.info("Exception encountered when acking record with transaction id: {}");
    } 
}

I have 2 workers that has concurrency 2 each. On kafka I have 3 partitions. I started one worker and 3 partitions are assigned to worker1. And then I sent a message. And a RuntimeException is thrown in listener and this happens for 20 times with 20 sec delay. Then when I started worker2 kafka rebalance triggers but partitions are not yet assigned. worker1 fails with message "Error while processing: ConsumerRecord"(after getContainerProperties().getShutdownTimeout())and and then all consumers join the group. And now the same message is delivered to worker2.

1) And this is working as I needed it to work. But I have a question, when a rebalance triggers why is partition assignment not happening immediately instead it waits for the worker1 to stop completely(waiting for getContainerProperties().getShutdownTimeout()) and then all consumers from worker1 and worker2 joins groups.

2) And during rebalance I observed that consumers stop calling poll(from logs below). Is it true?

Trace logs from worker 1:

2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-2-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.384 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.384 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:53.977 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.977 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.008 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:54.008 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.023  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-0] for group mris-group
2018-09-23 13:52:54.023 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.023 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.081  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-1] for group mris-group
2018-09-23 13:52:54.081 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.081 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.241  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-2] for group mris-group
2018-09-23 13:52:54.241 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.241 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-2]
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-0]
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:52:54.265  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:53:09.355 DEBUG 6384 --- [ listener-1-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Interrupting invoker
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker timed out while waiting for shutdown and will be canceled.
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-1]
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:53:24.101 ERROR 6384 --- [ listener-1-L-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = [email protected], value = <removed>])

org.springframework.retry.backoff.BackOffInterruptedException: Thread interrupted while sleeping; nested exception is java.lang.InterruptedException: sleep interrupted
    at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:86) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.backoff.StatelessBackOffPolicy.backOff(StatelessBackOffPolicy.java:36) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:305) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:55) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:34) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.2.2.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method) [na:1.8.0_162]
    at org.springframework.retry.backoff.ThreadWaitSleeper.sleep(ThreadWaitSleeper.java:30) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:83) ~[spring-retry-1.2.0.RELEASE.jar:na]
    ... 14 common frames omitted

2018-09-23 13:53:24.101  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.101  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-0] for group mris-group
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-2] for group mris-group
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.103  INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-0=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.104 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-2=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.106  INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-0]
2018-09-23 13:53:24.107 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.107 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.108  INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-2]
2018-09-23 13:53:24.108 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.108 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.207 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.207 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:25.111 TRACE 6384 --- [ listener-0-L-2] essageListenerContainer$ListenerConsumer : No records to process

Trace logs from worker2:

2018-09-23 13:53:24.102  INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.104  INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.106  INFO 6401 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-1] for group mris-group
2018-09-23 13:53:24.111 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-1=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.115  INFO 6401 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-1]
2018-09-23 13:53:24.118 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.118 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.189 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-09-23 13:53:24.189 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.202 TRACE 6401 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : Processing ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = [email protected], value = <removed>)
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 TRACE 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.210 TRACE 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.216 DEBUG 6401 --- [ listener-0-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception 
2018-09-23 13:53:25.194 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:25.194 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
1

1 Answers

0
votes

Versions prior to 1.3 had a very complicated threading model to avoid rebalancing due to a slow listener. KIP-62 enabled us to use a much simpler threading model in 1.3 and later.

1.2.x is no longer supported, and I don't have the time (or inclination) to go back to figure out what happened. Please upgrade to 1.3.7 (or even better, 2.1.10).