0
votes

I am writing an consumer application to pick records from kafka stream and process it using spring-kafka. My processing steps are as below :

Getting records from stream --> dump it into a table --> Fetch records and call API --> API will update records into a table --> calling Async Commit()

It seems in some scenarios, the API processing taking more time because of more records are being fetched and we are getting below errors?

Member consumer-prov-em-1-399ede46-9e12-4388-b5b8-f198a4e6a5bc sending LeaveGroup request to coordinator apslt2555.uhc.com:9095 (id: 2147483577 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

I know this can be handled by reducing max.poll.records or by increasing max.poll.interval.ms. What I am trying to understand if I set max.poll.records to 10 then what would be poll() behavior? Is it going to take 10 records from stream wait for these records to be committed and then will go for next 10 records ? When the next poll occurs ?Is it also going to impact performance as we are reducing max.poll.records from default 500 to 10.

Do I also have to increase max.poll.interval.ms. Probably make it 10 minutes. Is there any down impact that I should be aware of while changing these values ? Except these parameters, is there any other way to handle these errors ?

1
This might help - mike

1 Answers

2
votes

max.poll.records allows batch processing consumption model in which records are collected in memory before flushing them to another system. The idea is to get all the records by polling from kafka together and then process that in memory in the poll loop.

If you decrease the number then the consumer will be polling more frequently from kafka. This means it needs to make network call more often. This might reduce performance of kafka stream processing.

max.poll.interval.ms controls the maximum time between poll invocations before the consumer will proactively leave the group. If this number increases then it will take longer for kafka to detect the consumer failures. On the other hand, if this value is too low kafka might falsely detect many alive consumers as failed thus rebalancing more often.