We have written a Kafka consumer which polls for data, based on config.. every poll returns about 400 avro records which we buffer. After buffer we do a seek on end offset. When the buffer size reaches 2000, we write them to HDFS using execution service threads, wait for all to finish using Future.get. We keep appending the same files in HDFS(staging folder) till we reach a commit size of 10k. After reaching commit size we move the files from staging directory to a final output location in HDFS (this is done for atomic commit). Next flush on buffered data will make new files in staging directory. This is done to not have small files in HDFS and limit open file handles. After committing the file to output location, we do an async commit of offsets.
After 5 minutes, when 4-5 successful writes and offset commits have been done, we start getting this log:
2020-05-24 06:29:19 TRACE Fetcher:1122 - [Consumer clientId=consumer-xxx.events-1, groupId=metadata.events] Skipping fetch for partition topic.metadata.events-1 because previous request to kafka3-ckafka1-perf3-nvan.globalrelay.net:9093 (id: 3 rack: null) has not been processed
I believe that some fetch request was made to this node, whose callback never removed it from the pending fetches.
This is where request future is created: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L261
This is the part of the callback handler where we remove the request it from pending: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L321
It has blocked my usability testing of Kafka consumer as we hit this snag every run within 5 minutes. The record consumed rate starts around 1000, and when poll returns zero records, it comes down to 5rps. It never recovers. The number of threads and CPU usage also goes down.