Kafka uses heartbeat mechanism via a separate thread to check health of a consumer. The consumer heartbeat thread must send a heartbeat to the broker before session.timeout.ms
time expires.
heartbeat.interval.ms: The expected time between heartbeats to the
consumer coordinator when using Kafka's group management facilities.
Heartbeats are used to ensure that the consumer's session stays active
and to facilitate rebalancing when new consumers join or leave the
group.
session.timeout.ms: The timeout used to detect client failures when
using Kafka's group management facility. The client sends periodic
heartbeats to indicate its liveness to the broker. If no heartbeats
are received by the broker before the expiration of this session
timeout, then the broker will remove this client from the group and
initiate a rebalance.
Another mechanism to check consumers liveliness is polling. A consumer is expected to poll() without expiring max.poll.interval.ms
. If this time expires (normally long running process leads this problem) again consumer considered as dead.
max.poll.interval.ms: The maximum delay between invocations of poll()
when using consumer group management. This places an upper bound on
the amount of time that the consumer can be idle before fetching more
records. If poll() is not called before expiration of this timeout,
then the consumer is considered failed and the group will rebalance in
order to reassign the partitions to another member.
If a consumer is considered as dead by Kafka either because of no heartbeat in session.timeout.ms
or no poll in max.poll.interval.ms
consumer cannot commit messages and gets CommitFailedException
.
CommitFailedException: This exception is raised when an offset commit with KafkaConsumer.commitSync() fails with an unrecoverable
error. This can happen when a group rebalance completes before the
commit could be successfully applied. In this case, the commit cannot
generally be retried because some of the partitions may have already
been assigned to another member in the group.
As a result; because the heartbeat thread is a separate thread, sleep in your code cannot affect that. But in your case, you can set max.poll.interval.ms
to 10 seconds to get CommitFailedException
.
ConsumerRebalanceListener
, though - OneCricketeer