0
votes

I want to write transformed stream to Elasticsearch index as follows:

transformed.foreachRDD(rdd => {
  if (!rdd.isEmpty()) {
    val messages = rdd.map(prepare)
    messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
  }
})

The line val messages = rdd.map(prepare) throws an error (see below). I get stuck with it trying different ways to solve this issue (e.g. adding @transient next to val conf), but nothing seems to work.

6/06/28 19:23:00 ERROR JobScheduler: Error running job streaming job 1467134580000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.map(RDD.scala:323) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:77) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:75) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration Serialization stack: - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml) - field (class: de.kp.spark.elastic.stream.EsStream, name: de$kp$spark$elastic$stream$EsStream$$conf, type: class org.apache.hadoop.conf.Configuration) - object (class de.kp.spark.elastic.stream.EsStream, de.kp.spark.elastic.stream.EsStream@6b156e9a) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, name: $outer, type: class de.kp.spark.elastic.stream.EsStream) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, ) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, name: $outer, type: class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.map(RDD.scala:323) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:77) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:75) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration Serialization stack: - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml) - field (class: de.kp.spark.elastic.stream.EsStream, name: de$kp$spark$elastic$stream$EsStream$$conf, type: class org.apache.hadoop.conf.Configuration) - object (class de.kp.spark.elastic.stream.EsStream, de.kp.spark.elastic.stream.EsStream@6b156e9a) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, name: $outer, type: class de.kp.spark.elastic.stream.EsStream) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, ) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, name: $outer, type: class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more

Is it somehow related to the configuration of Hadoop? (I refer to this message: class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml)

UPDATE:

class EsStream(name:String,conf:HConf) extends SparkBase with Serializable {

  /* Elasticsearch configuration */ 
  val ec = getEsConf(conf)               

  /* Kafka configuration */
  val (kc,topics) = getKafkaConf(conf)

  def run() {

    val ssc = createSSCLocal(name,conf)

    /*
     * The KafkaInputDStream returns a Tuple where only the second component
     * holds the respective message; we therefore reduce to a DStream[String]
     */
    val stream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kc,topics,StorageLevel.MEMORY_AND_DISK).map(_._2)
    /*
     * Inline transformation of the incoming stream by any function that maps 
     * a DStream[String] onto a DStream[String]
     */
    val transformed = transform(stream)
    /*
     * Write transformed stream to Elasticsearch index
     */
    transformed.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        val messages = rdd.map(prepare)
        messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
      }
    })

    ssc.start()
    ssc.awaitTermination()    

  }

  def transform(stream:DStream[String]) = stream

  private def getEsConf(config:HConf):HConf = {

    val _conf = new HConf()

    _conf.set("es.nodes", conf.get("es.nodes"))
    _conf.set("es.port", conf.get("es.port"))

    _conf.set("es.resource", conf.get("es.resource"))

    _conf

  }

  private def getKafkaConf(config:HConf):(Map[String,String],Map[String,Int]) = {

    val cfg = Map(
      "group.id" -> conf.get("kafka.group"),

      "zookeeper.connect" -> conf.get("kafka.zklist"),
      "zookeeper.connection.timeout.ms" -> conf.get("kafka.timeout")

    )

    val topics = conf.get("kafka.topics").split(",").map((_,conf.get("kafka.threads").toInt)).toMap   

    (cfg,topics)

  }

  private def prepare(message:String):(Object,Object) = {

    val m = JSON.parseFull(message) match {
      case Some(map) => map.asInstanceOf[Map[String,String]]
      case None => Map.empty[String,String]
    }

    val kw = NullWritable.get

    val vw = new MapWritable
    for ((k, v) <- m) vw.put(new Text(k), new Text(v))

    (kw, vw)

  }

}
1
The object of org.apache.hadoop.conf.Configuration class seems to be globally declared as a field. Try to make it local.Amit Kumar
@amit_kumar: Thanks. Iǘe just posted the complete class. COuld you please indicate where should I make org.apache.hadoop.conf.Configuration local?Klue
What is HConf? That is causing problem. Check if that is serializable or not.Amit Kumar
@amit_kumar: It comes from org.apache.hadoop.conf.{Configuration => HBase}, where public class Configuration implements Iterable<Map.Entry<String,String>>, Writable {...}Klue

1 Answers

0
votes

Get rid of conf:HConf from class constructor of EsStream and write it like class EsStream(name:String).

Next create a method with signature : public def init(conf:HConf):Map(String,String)

In this method you will read desired configurations and update ec and (kc,topics) in this.

After this you should make a call to your run method.