1
votes

We are using Spark Streaming Receiver based approach, and we just enabled the Check pointing to get rid of data loss issue.

Spark version is 1.6.1 and we are receiving message from Kafka topic.

I'm using ssc inside, foreachRDD method of DStream, so it throws Not Serializable exception.

I tried extending the class Serializable, but still the same error. It is happening only when we enable checkpoint.

Code is:

def main(args: Array[String]): Unit = {

    val checkPointLocation = "/path/to/wal"
    val ssc = StreamingContext.getOrCreate(checkPointLocation, () => createContext(checkPointLocation))
    ssc.start()
    ssc.awaitTermination()
  }

    def createContext (checkPointLocation: String): StreamingContext ={

        val sparkConf = new SparkConf().setAppName("Test")
        sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
        val ssc = new StreamingContext(sparkConf, Seconds(40))
        ssc.checkpoint(checkPointLocation)
        val sc = ssc.sparkContext
        val sqlContext: SQLContext = new HiveContext(sc)
        val kafkaParams = Map("group.id" -> groupId,
        CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> sasl,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
        "metadata.broker.list" -> brokerList,
        "zookeeper.connect" -> zookeeperURL)
      val dStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
      dStream.foreachRDD(rdd =>
        {
           // using sparkContext / sqlContext to do any operation throws error.
           // convert RDD[String] to RDD[Row]
           //Create Schema for the RDD.
           sqlContext.createDataFrame(rdd, schema)
        })
        ssc
    }

Error log:

2017-02-08 22:53:53,250 ERROR [Driver] streaming.StreamingContext: Error starting the context, marking it as stopped java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1c5e3677) - field (class: com.x.payments.RemedyDriver$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext) - object (class com.x.payments.RemedyDriver$$anonfun$main$1, ) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, ) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files

]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.kafka.KafkaInputDStream, org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32)) - writeObject data (class: org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@6935641e) - field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@484bf033) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:557) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at com.x.payments.RemedyDriver$.main(RemedyDriver.scala:104) at com.x.payments.RemedyDriver.main(RemedyDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:559) 2017-02-08 22:53:53,250 ERROR [Driver] payments.RemedyDriver$: DStream checkpointing has been enabled but the DStreams with their functions are not serializable org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1c5e3677) - field (class: com.x.payments.RemedyDriver$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext) - object (class com.x.payments.RemedyDriver$$anonfun$main$1, ) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, ) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@68866c5) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@68866c5)) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files

]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.kafka.KafkaInputDStream, org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.kafka.KafkaInputDStream@acd8e32)) - writeObject data (class: org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@6935641e) - field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@484bf033) 2017-02-08 22:53:53,255 INFO [Driver] yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0

Update:

Basically what we are trying to do is, converting the rdd to DF[inside foreachRDD method of DStream], then apply DF API on top of that and finally store the data in Cassandra. So we used sqlContext to convert rdd to DF, that time it throws error.

1
can you show what is being done inside the foreachRDD? atleast a samplerogue-one
@Yuval Itzchakov: Could you please help me on this issue?Shankar

1 Answers

5
votes

If you want to access the SparkContext, do so via the rdd value:

dStream.foreachRDD(rdd => {
  val sqlContext = new HiveContext(rdd.context)
  val dataFrameSchema = sqlContext.createDataFrame(rdd, schema)
}

This:

dStream.foreachRDD(rdd => {
  // using sparkContext / sqlContext to do any operation throws error.
   val numRDD = sc.parallelize(1 to 10, 2)
   log.info("NUM RDD COUNT:"+numRDD.count())
}

Is causing the SparkContext to be serialized in the closure, which it can't because it isn't serializable.