Good morning all,
I am a beginner in scala and spark streaming, my use case consist on loading a stream from Kafka to spark streaming then to elasticsearch, here is my code:
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
object KAFKAStreaming {
def main(args: Array[String]): Unit = {
val brokers ="localhost:9092"
val groupid ="GRP1"
val topics ="producer"
val conf = new SparkConf().setMaster("local[*]").setAppName("test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes","http://localhost:9200")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
sc.setLogLevel("OFF")
val topicSet= topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupid,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicSet, kafkaParams)
)
// the stream :
//na1;03/04/2020 10:35;23
//na2;04/04/2020 10:35;15
//na1;05/04/2020 10:35;20
//na2;06/04/2020 10:35;12
//na1;07/04/2020 10:35;40
val line = messages.map(_.value)
val name =line.map(a=>("name" -> a.split(";")(0)))
val timestamp =line.map(a=>("timestamp" -> a.split(";")(1)))
val value =line.map(a=>("value" -> a.split(";")(2).toInt))
sc.makeRDD(Seq(name,timestamp,value)).saveToEs("spark/docs")
ssc.start()
ssc.awaitTermination()
}
}
I get this error:Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 2, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. Serialization stack:
I understood that it's a serialization problem, but I did not know how to solve it.
full error trace :
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 2, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. Serialization stack:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:274)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:108)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:79)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:74)
at org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:55)
at KAFKAStreaming$.main(KAFKAStreaming.scala:50)
at KAFKAStreaming.main(KAFKAStreaming.scala)
String? - Giorgos Myrianthous