0
votes

I am using Kafka: 2.11-1.0.1. The application contains consumers with concurrency=5 for the topic 'X' with partitions=5.

When the application is restarted and the message is published on topic 'X' before partition assignment, 5 consumers of topic 'X' find group coordinator and send the join group request to the group coordinator. It is expected to get a response from the group coordinator but no response is received.

I have Checked Kafka server logs but I could not find relevant logs found DEBUG log level.

When I run describe consumer group command, the following observation is made:

  1. consumer group is rebalancing
  2. Old consumers with some lag
  3. New consumers with some random names. As time goes new consumer numbers are increasing.

New messages are published on the topic 'X', but it is not being received by the consumers.

heartbeat and session.time.out is set as default.

This problem occurs if the message is published before the partition assignment for the topic 'X' and its consumers.

My doubt is: Why rebalancing is not getting complete so that new consumer starts consuming the newly produced message?

1

1 Answers

0
votes

Application have below consumers in consumer group

  • Consumer A listens to Topic1. Topic1 have 1 partition. max.poll.interval.time.ms=4 hours for this consumer.
  • Consumer B listens to Topic2. Topic2 have 5 partiition. Consumer B concurrency=5. max.poll.interval.time.ms=1 hour for this consumer.

What is happening on application restart and if one of the topic has already published message

  • When the application restarts one instance of consumer (consumerA1) created and it subscribes to topic1. ConsumerA1 finds Group Coordinate (GC) and sends join group request.
  • ConsumerA1 gets response from GC and becomes leader.Till this step not other consumer has initialized.
  • ConsumerA1 assigns partitions and sends SyncGroup request to GC. New assignment generation happens. In this way first rebalance completed.
  • Message on topic1 is already published , consumerA1 fetches this message and starts processing. Processing of completion of this message take significant time. (Say 2 hours)
  • Now 5 consumer instances initialize one by one and all of them subscribes to topic2. These consumer finds GC and sends join group request. but GC does not respond them.
  • When consumerA1 sends heartbeat to GC , GC responds that rebalancing is going on but consumerA1 does not revoke partition since it is processing the message.
  • According to Rebalancing protocol (Nice article on rebalancing) , GC waits till all consumer sends join group request. In this case , GC waits to get join group request from consumerA1. Maximum wait is max.poll.interval.time.ms i.e. 4 hours in this case.

Root Cause:

Group Coordinator did not wait to all consumers initialization after application restart therefore first unnecessary rebalance happened therefore consumerA1 fetched the message from partition and started processing it.

Solution: To avoid such unnecessary initial rebalance , kafka provides one configuration in which Group Coordinator waits till consumer join new consumer group. Documentation

group.initial.rebalance.delay.ms

Checked my kafka server.properties , it was set to 0. Tries with default i.e. 3 seconds. Initial rebalance avoided , GC wait 3 seconds on application restart and in this time all other consumers initialized.All consumers sent join group request , as all GC got request from all consumers. GC responded without any delay , rebalancing procedded and completed successfully.