Spark Streaming Problem with Kafka DirectStream:
spark streaming assertion failed: Failed to get records for spark-executor-a-group a-topic 7 244723248 after polling for 4096
Tried:
1) Adjust increasing spark.streaming.kafka.consumer.poll.ms
-- from 512 to 4096, less failed, but even 10s the failed still exists
2) Adjust executor memory from 1G to 2G
-- partly work, much less failed
3) https://issues.apache.org/jira/browse/SPARK-19275
-- still got failed when streaming durations all less than 8s ("session.timeout.ms" -> "30000")
4) Try Spark 2.1
-- problem still there
with Scala 2.11.8, Kafka version : 0.10.0.0, Spark version : 2.0.2
Spark configs
.config("spark.cores.max", "4")
.config("spark.default.parallelism", "2")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.receiver.maxRate", "1024")
.config("spark.streaming.kafka.maxRatePerPartition", "256")
.config("spark.streaming.kafka.consumer.poll.ms", "4096")
.config("spark.streaming.concurrentJobs", "2")
using spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar
Error stacks:
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:194)
...
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:108)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:142)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:108)
...
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Losing 1%+ blocks datum from Kafka with this failure :( pls help!