3
votes

In the kafka consumer, if processing of the message takes more than 5 minutes, the message is newly processed, I have configured consumer to increase "max.poll.interval.ms" and session.timeout.ms

max.poll.interval.ms= 7200000 (2Hrs) session.timeout.ms= 7200000 (2Hrs) request.timeout.ms=7206000 (~2Hrs)

this worked at first, after that, it is rebalancing I got this error on consumer

2019-10-18 15:29:51.739 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=processgroup] (Re-)joining

group

1
Are you using Kafka spark streaming?Nitin
No, I am using simple [kafka + zookeeper + kafdrop] Stack, on same docker composeMnib Yousr

1 Answers

1
votes

You are in the right approach. You also need to set session.timeout.ms in consumer and group.max.session.timeout.ms in Kafka broker to avoid rebalance.

After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call a poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

Note: Please note if the max poll increased a lot it will delay a group rebalance because consumer rebalance to join only when poll get called.

request.timeout.ms: The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

Note: This indirectly not impact on rebalancing however it need to set because it must always be larger than max.poll.interval.ms else it throws configuration error.

session.timeout.ms: The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance.

Note: But setting session.timeout.ms little tricky it alone would not work you need to check Kafka broker setting group.min.session.timeout.ms that also need to increase.

group.max.session.timeout.ms(in Kafka broker): The maximum allowed session timeout for registered consumers. Its default value is 30000 ms.