4
votes

I'm doing Spark Streaming over Kafka. The streaming job starts fine and runs for a few hours before it runs into the following issue:

17/05/18 03:44:47 ERROR Executor: Exception in task 8.0 in stage 1864.0 (TID 27968) java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-c10f4ea9-a1c6-4a9f-b87f-8d6ff66e10a5 madlytics-rt_1 3 1150964759 after polling for 60000 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Also, i increased the values of heartbeat.interval.ms, session.timeout.ms and request.timeout.ms appropriately as suggested here: https://issues.apache.org/jira/browse/SPARK-19275

Given below are some relevant configs:

batch.interval = 60s
spark.streaming.kafka.consumer.poll.ms = 60000
session.timeout.ms = 60000 (default: 30000)
heartbeat.interval.ms = 6000 (default: 3000)
request.timeout.ms = 90000 (default: 40000)

Also, the Kafka cluster is a 5 node one, and the topic that i'm reading has 15 partitions. Some other Kafka configs are listed below:

num.network.threads=8
num.io.threads=8

Any help will be much appreciated. Thanks.

2
Did you check the Kafka broker(s)? Anything in the logs?Jacek Laskowski
@JacekLaskowski, yes, i checked the logs. There were no such ERROR messages there.Aaquib Khwaja
Anything in the Spark application's logs that would be of interest? What about web UI and processing times?Jacek Laskowski
No, the only i could see were the ones that i've mentioned in the post.Aaquib Khwaja
I am experiencing the same issue all of a sudden. Has this been resolved?SunitaKoppar

2 Answers

3
votes

I solved the issue using a simple configuration change which was quite apparent but it took me sometime to realize how such a default (mis)configuration could be left untreated.

The primary issue is Spark config spark.streaming.kafka.consumer.poll.ms (default 512ms in KafkaRDD) or spark.network.timeout (default 120sec, if spark.streaming.kafka.consumer.poll.ms is not set) is always less than Kafka consumer request.timeout.ms (default 305000ms in Kafka newconsumerapi) ... hence spark polling always times out before timeout happens at Kafka consumer request/poll (when there are no records available in Kafka topic).

Simply increasing spark.streaming.kafka.consumer.poll.ms to a value greater than Kafka request.timeout.ms should do the trick. Also adjust Kafka consumer max.poll.interval.ms to be always less than request.timeout.ms.

Q.E.D and Good luck.

0
votes

In my experience this particular failure is a symptom of a Kafka cluster which is overloaded. The usual suspects are always GC worldstops and threads starving.

On top of this, everything might be fine with Kafka on the surface, but perhaps not.

Is it spending a lot of time rebalancing after you added a partition? Or is it maintaining a humongous offsets topic because of all the load tests you performed?

What happened to me once is that the cluster was fine on the surface, but this timeout came out here and there. On a brand new, and even smaller, cluster, this issue disappeared.