2
votes

I want to pass path to the function saveAsTextFile that runs in Spark Streaming. However, I get the java.io.NotSerializableException. Usually in similar cases I use a skeleton object, but in this particular case I don't know how to solve the issue. Can anybody help me please?

import java.util
import java.util.Properties
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.lambdaworks.jacks.JacksMapper
import org.sedis._
import redis.clients.jedis._
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.utils.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

class KafkaTestConsumer(val zkQuorum: String,
                        val group: String,
                        val topicMessages: String,
                        val path: String) extends Logging
{

// ...
// DStream[String]
dstream.foreachRDD { rdd =>
   // rdd -> RDD[String], each String is a JSON
   // Parsing each JSON
   // splitted -> RDD[Map[String,Any]]
   val splitted = rdd.map(line => Utils.parseJSON(line)) 
   // ...
   splitted.saveAsTextFile(path)
}

}

object Utils {

  def parseJSON[T](json: String): Map[String,Any] = {
    val mapper = new ObjectMapper() with ScalaObjectMapper
    mapper.registerModule(DefaultScalaModule)
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.readValue[Map[String,Any]](json)
  }
}

The whole stacktrace:

16/09/22 17:03:28 ERROR Utils: Exception encountered java.io.NotSerializableException: org.consumer.kafka.KafkaTestConsumer at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:180) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:175) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:175) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializableWithWriteObjectMethod(SerializationDebugger.scala:230) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:189) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206) at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108) at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:560) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at org.consumer.kafka.KafkaDecisionsConsumer.run(KafkaTestConsumer.scala:136) at org.consumer.ServiceRunner$.main(QueuingServiceRunner.scala:20) at org.consumer.ServiceRunner.main(QueuingServiceRunner.scala)

2
Can you please post the whole stack trace?bear911
@bear911: Done.Lobsterrrr
What is the type parameter of this DStream? Can you post more code?bear911
@bear911: Ok, I posted more code with comments.Lobsterrrr
The problem might be with JSON parsing. I would double-check Scala versions used in your project and if everything is OK - perhaps use some other JSON library. Maybe someone else will be able to troubleshoot this one.bear911

2 Answers

0
votes

The problem is you are using rdd action saveAsText file inside the dstream action forEach which is running on workers thats why it is giving serializable error example when you are running above code worker trying to execute splitted.saveAsTextFile(path) which is rdd action thats why it give serialization error so you can it like below

dstream.foreachRDD { rdd =>
   // rdd -> RDD[String], each String is a JSON
   // Parsing each JSON
   // splitted -> RDD[Map[String,Any]]
   val splitted = rdd.map(line => Utils.parseJSON(line)) 
   // ...
}.saveAsTextFile(path)
0
votes

While working with Spark 2.3.0 version, I encountered the same issue, I did not removed the checkpoint statement. I got it resolved by doing 2 things:"

Running command:

chmod 777 checkpoint_directory

And implementing Serializable interface for the class it threw the error.

In your case, you need to implement Serializable for below class. Hope that should resolve.

org.consumer.kafka.KafkaTestConsumer