1
votes

We have written a Kafka consumer which polls for data, based on config.. every poll returns about 400 avro records which we buffer. After buffer we do a seek on end offset. When the buffer size reaches 2000, we write them to HDFS using execution service threads, wait for all to finish using Future.get. We keep appending the same files in HDFS(staging folder) till we reach a commit size of 10k. After reaching commit size we move the files from staging directory to a final output location in HDFS (this is done for atomic commit). Next flush on buffered data will make new files in staging directory. This is done to not have small files in HDFS and limit open file handles. After committing the file to output location, we do an async commit of offsets.

After 5 minutes, when 4-5 successful writes and offset commits have been done, we start getting this log:

2020-05-24 06:29:19 TRACE Fetcher:1122 - [Consumer clientId=consumer-xxx.events-1, groupId=metadata.events] Skipping fetch for partition topic.metadata.events-1 because previous request to kafka3-ckafka1-perf3-nvan.globalrelay.net:9093 (id: 3 rack: null) has not been processed

I believe that some fetch request was made to this node, whose callback never removed it from the pending fetches.

This is where request future is created: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L261

This is the part of the callback handler where we remove the request it from pending: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L321

It has blocked my usability testing of Kafka consumer as we hit this snag every run within 5 minutes. The record consumed rate starts around 1000, and when poll returns zero records, it comes down to 5rps. It never recovers. The number of threads and CPU usage also goes down.

1
how much time does it take between 2 polls?Dragonborn
polls which actually get records take no more than 20s and i have set 30s as poll duration timeout. also i see some Disconnect exceptions x2020-05-24 06:29:45 INFO FetchSessionHandler:445 - [Consumer clientId=consumer-ERS_hkg7_cp.ers.perf3.ca.nvan.ma.metadata.events-1, groupId=ERS_hkg7_cp.ers.perf3.ca.nvan.ma.metadata.events] Error sending fetch request (sessionId=1077301634, epoch=2668) to node 3: {}. xorg.apache.kafka.common.errors.DisconnectExceptionHPKG
in debug mode, you can put a breakpoint, remove the pending item and see if the poll returns records. my guess is issue is somewhere elseDragonborn

1 Answers

1
votes

The issue was due to open file handles. It was because of dataFileWriter api for Avro, in append mode. It takes seekable input and output stream. When you close the writer, it closes the output stream but not seekable input. Due to which number of open file handles crossed 44k on all the Data nodes and record consumed rate fell drastically. Closing the seekable input explicitly fixed the issue