0
votes

We are facing an issue with a set of kafka-consumers. Whenever there is activity on the kafka-cluster like rebooting the brokers(rolling restarts) or reboot of VM's running the broker, our kafka consumers LeaveGroup after failing to heartbeat. The below logs repeat exactly for one minute and correspond to the commitSync call being done in the application code as a part of consuming messages from the topic

[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Offset commit failed on partition <topic-name> at offset 455700: The coordinator is loading and hence can't process requests.

This interval corresponds to the default time for which retries are done in the Kafka consumer client commitSync java API.

Post this there are no logs for the next 5 minutes.

Thereafter I see the following

[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Attempt to heartbeat failed since group is rebalancing
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] Member consumer-13-837563e4-49e9-4bd1-aee4-cb21263e176a sending LeaveGroup request to coordinator <broker-host-name> (id: 2147483646 rack: null)
[Consumer clientId=consumer-13, groupId=delay-group-PostEnrollmentAppManagement] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

After this the messages pile up in the topic and we notice the lag obviously increasing with no consumers. We restart the app hosting the consumers to start consumption again.

What can we do to avoid this? Should anything be done on the application consumer side to handle this?

Note : For this particular consumer we have used the apache kafka client library. We normally use the spring-kafka library to build our consumers. We used the apache one as we wanted to use pause and resume functions of the kafka consumers which are not supported in the version of the spring-kafka we use

2

2 Answers

0
votes

When you mention that you are doing activities on the kafka broker i.e. restarting VMs (these should be controlled restarts of the kafka service and not the containers). What I mean is if you want constant consumption even in the time of maintenance you must consider the following -

  • Kafka Brokers must be pulled down for maintenance in a rolling restart fashion, click here for details
  • The above is suggested one at a time or depending on the ISR settings in the cluster configs
  • Number of partitions and the replication factor must be greater than 1 as if a broker is down for maintenance your topic should not have offline partitions which results in a producer/consumer failure and in turn data loss

A personal suggestion the controller can be pull down as the last one in the rolling restarts to avoid multiple controller switches and reloading of the cluster metadata

When we do a rolling restart for each broker after a maintenance activity, the broker takes some time to come up, i.e. the time taken for it to repopulate the partition metadata and for all the under replicated partitions to return to 0 (This is very important so as not to pressure the controller with multiple restarts, as multiple under replicated partitions amy cause offline/unavailable topic partitions depending on your config)

On top of the above you can definitely tweak the following consumer configs -

  • heartbeat.interval.ms - must be lower than session.timeout.ms
  • session.timeout.ms
  • max.poll.interval.ms THe above can be tweaked based on your connection latencies and kafka cluster status You can read more about them here on the Confluent Docs

It is also possible at time when cluster maintenance activity takes place, the broker assigned to be the partition leader takes time to respond which is significantly greater than the session.timeout.msmax.poll, in the case of which the consumer stops retrying. So tweaking the consumer configs and maintaining sanity in cluster operations is the key to a healthy and continuous kafka integration.*

Note - On a personal opinion, having done cluster upgrades/maintenance activities with over 1Gpbs throughput and we do not face consumption issues (expect a spike in request handler/network handler latencies because of the rebalance). Keeping the above disclaimers in mind with careful execution, updates are easier but definitely time consuming as they are to be executed serially in fashion

More help with documentation for cluster maintenance and consumer behavior tweaks -

0
votes

Finally root-caused the issue. The kafkaConsumer#commitSync was throwing an unchecked exception,TimeOutException, as the new group coordinator had not finished loading offsets in the one minute that the commitSync does retries for when faced with an error.

I hadn't handled this transient error. What made this difficult to debug was that I had spawned my consumer of a thread from the Main thread. There was no exception handling in the consumer thread nor was I examining the future object in the main thread. As a result the TimeOutException was not being logged also.