2
votes

We currently have around 80 applications (Around 200 K8s replicas) writing 16-17 Million records everyday to kafka and some of those records were failing intermittently with time out and rebalance exceptions. The failure rate was less than 0.02%.

We have validated and configured all the parameters properly as suggested by other stackoverflow links and still we are getting multiple issues.

One issue is related to Rebalance , We are facing issues on Producer and Consumer side both with this issue. For Consumer, we are using auto commit and sometimes Kafka is rebalancing, and consumer is receiving duplicate records. we didn't put any duplicate check because it will reduce the rate of processing and the duplicate record rate is less than 0.1%. We are thinking of going for manual commit and offset management using database. But need to understand from Kafka brokers perspective why rebalancing is happening on a daily basis.

Producer Error:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Second issue is related to Timeout Exception . It's happening intermittently for some of the apps, Producer was trying to send a record and it has been added to the batch, and it was not unable to deliver until the request timeout which we have increased to 5minutes. Ideal case Kafka should be retrying at certain interval. During debugging ,we found that record accumulator is expiring the previous batching without even trying to send them in case of request time out - is it the expected behavior? Can we anyway add the retry for this?

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for xxTopic-41:300014 ms has passed since batch creation. </b>

Configuration:
1. 5 brokers and 3 zookeepers - Kafka version 2.2
2. Brokers are running in Kubernetes with statefulset.
3. Each broker has 32GB and 8 CPU as recommended by Confluent for Production.
4. Topic has 200 partitions and 8 replica consumers.
5. Each consumer is handling around 25-30 threads only. Consumer has 4GB and 4CPU capacity.


@Value("${request.timeout:300000}") <br/>
private String requestTimeOut;

@Value("${batch.size:8192}") <br/>
private String batchSize;

@Value("${retries:5}") <br/>
private Integer kafkaRetries;

@Value("${retry.backoff.ms:10000}") <br/>
private Integer kafkaRetryBackoffMs;

As we are from the development team, didn't have much insights into networking aspect, Need help whether this is something related to network congestion or we need to improve anything in the application itself. We didn't face any issues when the load was less than 10 Million per day and with lot of new apps sending the messages and increased load, we are seeing the above mentioned two issues intermittently.

2
When we further analyzed the issue - we found that producer errors are occurred for low throughput apis(10000 per day) but not from widely used APIs(1M per day) Also - we see many idle connection timeout from the former app which means the requests rate is pretty low. So, when a request is being sent using producer - if the sender thread is not available , then the record will be added to batch. And it will be tried to sent again on next request where its possibly timing out with out retry.santosh gamini

2 Answers

0
votes

Regarding the error from the producer side, make sure to include all the brokers that are partition leaders for your topic. You can find out which broker is the leader of a partition by running:

./kafka-topics.sh \
    --zookeeper zookeeper-host:2181 \
    --describe \
    --topic your-topic-name 


Topic: your-topic-name   PartitionCount:3    ReplicationFactor:1 
Topic: your-topic-name   Partition: 0    Leader: 2   Replicas: 2 Isr: 2
Topic: your-topic-name   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: your-topic-name   Partition: 2    Leader: 1   Replicas: 1 Isr: 1

In the above example, you'd have to provide all the addresses for brokers 0, 1 and 2.

0
votes

I think I'd have to disagree with Giorgos, though I may be just misunderstanding his point.

Producers and consumers only ever talk to the partition leader - the other replicas are just on standby in case the leader goes down, and they act as consumers to the leader to keep their data up to date.

However on application startup the client code can connect to any one of the brokers and will find out the partition leaders when they fetch metadata. This metadata is then cached at the client side, but if there is a change in leadership at the broker end that is when you will see a NotLeaderForPartitionException, and this will prompt the client code to fetch metadata again to get the current set of partition leaders. Leadership election does take time, and so there will be some delay during this process, but it is a sign that the replication and broker end resilience is working correctly.

On the consumer side, manual commit vs autocommit will make no difference if you use the standard offset commit topic - autocommit just means that each time you poll, the previous processed messages will be committed (actually possibly not EVERY poll), but this is likely the same thing you would do manually. Storing offsets in a database will help keep things transactional if processing a message means updating data in that database - in that case you can commit offsets and processed data in the same DB transaction.

Basically, as I'm fairly sure you realise, normally duplicates are an inevitable part of consumer scalability as it allows any consumer process to pick up a partition and go from the last committed offset. Duplicates happen when a consumer has processed part of a batch and then been considered to be offline, either because the process has actually died or because of taking too long to process a batch. To avoid duplicates you have to ensure that every processed message is associated with a commit, in the same transaction. The cost is normally the rate of throughput, but as you suggest manual commit of each message rather than at batch level, and storing offsets in the same DB transaction can prevent duplicate consumption.

On the question of why rebalance is happening, there are only 2 reasons - change in number of partitions on the topic, or a perceived change in consumer group membership. There are two possible reasons for the latter - heartbeat thread has stopped, which would normally means the consumer application has stopped, or processing a batch has exceeded the max.poll.interval.ms (this configuration is intended to stop livelock, where a consumer is alive and sending heartbeats but has stopped polling).This last is the normal cause of rebalances outside application restarts - inevitably there is sometimes a bit of lag somewhere in any system and so consumer rebalances are generally considered normal if they don't happen too often because of a bit of delay processing a batch.

I'm not sure on the producer side issues - in my case I handle duplicates in the consumer, and in the producer I just allow a high number of retries, with acks=all (essential if you can't afford to lose messages) and 1 maximum in-flight request (to ensure ordering). Are the producer timeouts related to the NotLeaderForPartitionException? Is it just because of the leadership election?

(there is some more detail at https://chrisg23.blogspot.com/2020/02/kafka-acks-configuration-apology.html - a slightly rambling blog post, but may be interesting)