I want to pass path
to the function saveAsTextFile
that runs in Spark Streaming. However, I get the java.io.NotSerializableException
. Usually in similar cases I use a skeleton object, but in this particular case I don't know how to solve the issue. Can anybody help me please?
import java.util
import java.util.Properties
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.lambdaworks.jacks.JacksMapper
import org.sedis._
import redis.clients.jedis._
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.utils.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
class KafkaTestConsumer(val zkQuorum: String,
val group: String,
val topicMessages: String,
val path: String) extends Logging
{
// ...
// DStream[String]
dstream.foreachRDD { rdd =>
// rdd -> RDD[String], each String is a JSON
// Parsing each JSON
// splitted -> RDD[Map[String,Any]]
val splitted = rdd.map(line => Utils.parseJSON(line))
// ...
splitted.saveAsTextFile(path)
}
}
object Utils {
def parseJSON[T](json: String): Map[String,Any] = {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.readValue[Map[String,Any]](json)
}
}
The whole stacktrace:
16/09/22 17:03:28 ERROR Utils: Exception encountered java.io.NotSerializableException: org.consumer.kafka.KafkaTestConsumer at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:180) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:175) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:175) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:189) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:560) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at org.consumer.kafka.KafkaDecisionsConsumer.run(KafkaTestConsumer.scala:136) at org.consumer.ServiceRunner$.main(QueuingServiceRunner.scala:20) at org.consumer.ServiceRunner.main(QueuingServiceRunner.scala)