1
votes

I have a situation where I have a single kafka consumer which retrieves records from kafka using the poll mechanism. Sometimes this consumer gets kicked out of the consumer group due to failure to call poll within the session.timeout period which I have configured to 30s. My question is if this happens will a poll at some later point of time re-add the consumer to the group or do I need to do something else?

I am using kafka version 0.10.2.1

Edit: Aug 14 2018

Some more info. After I do a poll I never process the records in the same thread. I simply add all the records to a separate queue (serviced by a separate thread pool) for processing.

3

3 Answers

2
votes

Poll will initiate a "join group" request, if the consumer is not a member of the group yet, and will result in consumer joining the group (unless some error situation prevents it). Note that depending on the group status (other members in the group, subscribed topics in the group) the consumer may or may not get the same partitions it was consuming from before it was kicked out. This would not be the case if the consumer is the only consumer in the group.

2
votes

Consumer gets kicked out if it fails to send heart beat in designated time period. Every call to poll sends one heart beat to consumer group coordinator.

You need to look at how much time is it taking to process your single record. Maybe it is exceeding session.timeout.ms value which you have set as 30s. Try increasing that. Also keep max.poll.records to a lower value. This setting determines how many records are fetched after the call to poll method. If you fetch too many records then even if you keep session.timeout.ms to a large value your consumer might still get kicked out and group will enter rebalancing stage.

1
votes

Vahid already mentioned what happens when a kicked-out consumer rejoins the group. You can also tune the below configuration so that consumer won't be kicked out of the group.

  1. max.poll.records - which gives the pre-defined number of records in the poll loop (default: 500)
  2. max.poll.interval.ms - which gives you the amount of time required to process the messages that are received in the poll. (default: 5 min)

You can see the impacts of updating the above configuration in KIP-62

Alternatively, you can use KafkaConsumer#assign mode as you've mentioned that you're using only one consumer. This mode won't do any re-balance.