I want to write transformed stream to Elasticsearch index as follows:
transformed.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val messages = rdd.map(prepare)
messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
}
})
The line val messages = rdd.map(prepare)
throws an error (see below). I get stuck with it trying different ways to solve this issue (e.g. adding @transient
next to val conf
), but nothing seems to work.
6/06/28 19:23:00 ERROR JobScheduler: Error running job streaming job 1467134580000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.map(RDD.scala:323) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:77) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:75) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration Serialization stack: - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml) - field (class: de.kp.spark.elastic.stream.EsStream, name: de$kp$spark$elastic$stream$EsStream$$conf, type: class org.apache.hadoop.conf.Configuration) - object (class de.kp.spark.elastic.stream.EsStream, de.kp.spark.elastic.stream.EsStream@6b156e9a) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, name: $outer, type: class de.kp.spark.elastic.stream.EsStream) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, ) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, name: $outer, type: class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.map(RDD.scala:323) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:77) at de.kp.spark.elastic.stream.EsStream$$anonfun$run$1.apply(EsStream.scala:75) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration Serialization stack: - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml) - field (class: de.kp.spark.elastic.stream.EsStream, name: de$kp$spark$elastic$stream$EsStream$$conf, type: class org.apache.hadoop.conf.Configuration) - object (class de.kp.spark.elastic.stream.EsStream, de.kp.spark.elastic.stream.EsStream@6b156e9a) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, name: $outer, type: class de.kp.spark.elastic.stream.EsStream) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1, ) - field (class: de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, name: $outer, type: class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1) - object (class de.kp.spark.elastic.stream.EsStream$$anonfun$run$1$$anonfun$2, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 30 more
Is it somehow related to the configuration of Hadoop? (I refer to this message: class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
)
UPDATE:
class EsStream(name:String,conf:HConf) extends SparkBase with Serializable {
/* Elasticsearch configuration */
val ec = getEsConf(conf)
/* Kafka configuration */
val (kc,topics) = getKafkaConf(conf)
def run() {
val ssc = createSSCLocal(name,conf)
/*
* The KafkaInputDStream returns a Tuple where only the second component
* holds the respective message; we therefore reduce to a DStream[String]
*/
val stream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kc,topics,StorageLevel.MEMORY_AND_DISK).map(_._2)
/*
* Inline transformation of the incoming stream by any function that maps
* a DStream[String] onto a DStream[String]
*/
val transformed = transform(stream)
/*
* Write transformed stream to Elasticsearch index
*/
transformed.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val messages = rdd.map(prepare)
messages.saveAsNewAPIHadoopFile("-", classOf[NullWritable], classOf[MapWritable], classOf[EsOutputFormat], ec)
}
})
ssc.start()
ssc.awaitTermination()
}
def transform(stream:DStream[String]) = stream
private def getEsConf(config:HConf):HConf = {
val _conf = new HConf()
_conf.set("es.nodes", conf.get("es.nodes"))
_conf.set("es.port", conf.get("es.port"))
_conf.set("es.resource", conf.get("es.resource"))
_conf
}
private def getKafkaConf(config:HConf):(Map[String,String],Map[String,Int]) = {
val cfg = Map(
"group.id" -> conf.get("kafka.group"),
"zookeeper.connect" -> conf.get("kafka.zklist"),
"zookeeper.connection.timeout.ms" -> conf.get("kafka.timeout")
)
val topics = conf.get("kafka.topics").split(",").map((_,conf.get("kafka.threads").toInt)).toMap
(cfg,topics)
}
private def prepare(message:String):(Object,Object) = {
val m = JSON.parseFull(message) match {
case Some(map) => map.asInstanceOf[Map[String,String]]
case None => Map.empty[String,String]
}
val kw = NullWritable.get
val vw = new MapWritable
for ((k, v) <- m) vw.put(new Text(k), new Text(v))
(kw, vw)
}
}
org.apache.hadoop.conf.{Configuration => HBase}
, wherepublic class Configuration implements Iterable<Map.Entry<String,String>>, Writable {...}
– Klue