1
votes

Good morning all,

I am a beginner in scala and spark streaming, my use case consist on loading a stream from Kafka to spark streaming then to elasticsearch, here is my code:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object KAFKAStreaming {

def main(args: Array[String]): Unit = {
val brokers ="localhost:9092"
val groupid ="GRP1"
val topics ="producer"

val conf = new SparkConf().setMaster("local[*]").setAppName("test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes","http://localhost:9200")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

sc.setLogLevel("OFF")

val topicSet= topics.split(",").toSet
val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupid,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
)

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topicSet, kafkaParams)
)
// the stream :
//na1;03/04/2020 10:35;23
//na2;04/04/2020 10:35;15
//na1;05/04/2020 10:35;20
//na2;06/04/2020 10:35;12
//na1;07/04/2020 10:35;40

val line = messages.map(_.value)
val name =line.map(a=>("name" -> a.split(";")(0)))
val timestamp =line.map(a=>("timestamp" -> a.split(";")(1)))
val value =line.map(a=>("value" -> a.split(";")(2).toInt))

 sc.makeRDD(Seq(name,timestamp,value)).saveToEs("spark/docs")

ssc.start()
ssc.awaitTermination()

}
}

I get this error:Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 2, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. Serialization stack:

I understood that it's a serialization problem, but I did not know how to solve it.

full error trace :

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 2, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: Graph is unexpectedly null when DStream is being serialized. Serialization stack:

at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:274)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:108)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:79)
at org.elasticsearch.spark.rdd.EsSpark$.saveToEs(EsSpark.scala:74)
at org.elasticsearch.spark.package$SparkRDDFunctions.saveToEs(package.scala:55)
at KAFKAStreaming$.main(KAFKAStreaming.scala:50)
at KAFKAStreaming.main(KAFKAStreaming.scala)
2
First of all, are you sure that both the key and value of the payload are of type String? - Giorgos Myrianthous
Also, can you share the full error trace? - Giorgos Myrianthous
hi @ Giorgos Myrianthous, thank you for your answer, your question is in relation to the timestamp fields ? - AIZ
Can you please move below values inside some scala object & try again.- val line = messages.map(_.value) val name =line.map(a=>("name" -> a.split(";")(0))) val timestamp =line.map(a=>("timestamp" -> a.split(";")(1))) val value =line.map(a=>("value" -> a.split(";")(2).toInt)) - Srinivas

2 Answers

-1
votes

There are some misconfusion in your code on how you create the rdd and how you persist them into elastic search

sc.makeRDD(Seq(name,timestamp,value)).saveToEs("spark/docs")

From your code name, timestamp and values are already rdd, so making a sequence of them and a rdd makes no sense, hence the error.

What you want to do is

name.saveToEs("spark/names")
timestamps.saveToEs("spark/timestamps")
value.saveToEs("spark/values")

If you want all in one collection make case class around the arrvied messages on the stream, enhance with more information and than save only one rdd to ES

-1
votes

https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html

You can save jsons to elasticsearch as below.

val yourRdd = Seq(("name","timestamp","value"),("name1","timestamp1","value1")).toDF().rdd
EsSpark.saveJsonToEs(yourRdd, elasticconfig)