3
votes

https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html mentions that "As long the consumer is sending heartbeats in regular intervals, it is assumed to be alive, well and processing messages from its partitions. In fact, the act of polling for messages is what causes the consumer to send those heartbeats. If the consumer stops sending heartbeats for long enough, its session will time out and the group coordinator will consider it dead and trigger a rebalance."

Similarly https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html specifies that "The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. As long as the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned to it. If it stops heartbeating for a period of time longer than session.timeout.ms then it will be considered dead and its partitions will be assigned to another process. "

In my application, the processing of messages received from previous poll() can take upto hours before it calls another poll(). Note: I disable auto commit because I don't always know how long it would take to process all the previous messages.

a) Will that cause group coordinator to consider the consumer dead or inactive?

b) Are there other ways to send heartbeat messages to group coordinator to keep the session active?

c) Will session.timeout.ms have any effect here on keeping consumer alive/active?

1

1 Answers

5
votes

a) Yes, if you do not call poll() for longer than session.timeout.ms Kafka considers the consumer as dead.

b) As an alternative, you could call poll() during processing (ie, interleaved with processing) to trigger a heart beat (and seek before each "real" poll). Using an extra processing thread would also be possible, allowing the main thread to poll regularly for sending heart beat. However, you need to ensure that failures on the processing thread are detected (what is tricky to do correctly)!

c) You could increase the timeout value, however, this might not be what you want, as if your consumer fails, this failure gets detected very late.

The problem you describe is actually known, and the consumer behavior might change in the future. There is already a discussion about it. See KIP-62 for more details.

Update

Since Kafka 0.10.1 a consumer has two configuration parameters: max.poll.interval.ms and session.timeout.ms. The first is the max time between two consecutive polls, while the second is the heartbeat timeout. Heartbeats are send in an extra thread and thus decoupled from calling poll() now. Thus, increasing max.poll.interval.ms does not have the negative effect that a failure of the whole client (no heartbeat) is not detected quickly.