4
votes

I have recently been using spark streaming to process data in kafka.

After the application is started and a few batches are finished, there is a continuous delay.

Most of the time, data processing is completed within 1-5 seconds.

However, after several batches, it took 41 ~ 45 seconds continuously, and most of the delay occurred in the area that fetches data from stage0.

I accidentally found the Kafka request.timemout.ms setting to be 40 seconds by default and changed this setting to 10 seconds.

I then restarted the application and observed that the batch was completed in 11 to 15 seconds.

Actual processing time is 1-5 sec. I can not understand this delay.

What is wrong?

My environment is as follows.

Spark streaming 2.1.0(createDirectStream)

Kafka : 0.10.1

Batch interval : 20s

Request.timeout.ms : 10s

/////

The following capture is the graph when request.timeout.ms is set to 8 seconds.

enter image description here

2
You have to provide more detail. Show us what your Spark graph looks like, and perhaps add a picture of what the Streaming API looks like, and particularly dive into the batches that take a long time. - Yuval Itzchakov
Hi kim, have you solved this problem? I'm facing the same issue. - JasonWayne

2 Answers

0
votes

I found the problem and solution:

Basically when you are reading from your executors every partition of kafka, spark streaming for improve the performance or reading and processing, is caching the content of the partition read in memory.

If the size of the topic is so big, the cache can overflow and when kafka connect do fetch to kafka the cache is full and get the timeout.

Solution: If you are in spark 2.2.0 or higher( from spark documentation) this is the solution, is a bug known by spark and cloudera:

The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via spark.streaming.kafka.consumer.cache.maxCapacity.

If you would like to disable the caching for Kafka consumers, you can set spark.streaming.kafka.consumer.cache.enabled to false. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved.

The cache is keyed by topicpartition and group.id, so use a separate group.id for each call to createDirectStream.

spark.streaming.kafka.consumer.cache.enabled to false In your spark-submit as parameter and your mini-bacth performance will be like a supersonic aeroplane.

0
votes

We face the same issue too, and after lots of analysis, we find that it is due to a kafka bug as described in KAFKA-4303.

For spark applications, we can avoid this issue by setting reconnect.backoff.ms = 0 in the consumer config.

I may decribe more details when I have time.