0
votes

Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2.apply(DateFormatConstraint.scala:32) at org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2.apply(DateFormatConstraint.scala:16) at org.exadatum.ddq.core.Runner$$anonfun$run$1$$anonfun$3.apply(Runner.scala:22) at org.exadatum.ddq.core.Runner$$anonfun$run$1$$anonfun$3.apply(Runner.scala:22) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.exadatum.ddq.core.Runner$$anonfun$run$1.apply(Runner.scala:22) at org.exadatum.ddq.core.Runner$$anonfun$run$1.apply(Runner.scala:20) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.exadatum.ddq.core.Runner$.run(Runner.scala:20) at org.exadatum.ddq.core.RunCheck.(RunCheck.scala:104) at org.exadatum.ddq.core.DQJobTrigger$.main(DQJobTrigger.scala:39) at org.exadatum.ddq.core.DQJobTrigger.main(DQJobTrigger.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1d9bd4d6) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint, name: sc, type: class org.apache.spark.SparkContext) - object (class org.exadatum.ddq.constraints.DateFormatConstraint, DateFormatConstraint(startdate,java.text.SimpleDateFormat@4f76f1a0,org.apache.spark.SparkContext@1d9bd4d6,xdqdemo.customer_details)) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2, name: $outer, type: class org.exadatum.ddq.constraints.DateFormatConstraint) - object (class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2, ) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2$$anonfun$3, name: $outer, type: class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2) - object (class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2$$anonfun$3, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(startdate#2)) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(UDF(startdate#2))) - field (class: org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, name: predicates, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, InMemoryColumnarTableScan [phone_number#0,name#1,startdate#2], [UDF(startdate#2)], InMemoryRelation [phone_number#0,name#1,startdate#2], true, 10000, StorageLevel(false, true, false, true, 1), ConvertToUnsafe, None ) - field (class: org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan$$anonfun$doExecute$1, name: $outer, type: class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan) - object (class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan$$anonfun$doExecute$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, type: interface scala.Function1) - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, ) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[8] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.NarrowDependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@316975be) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@316975be)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[9] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.NarrowDependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@526fbb80) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@526fbb80)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[10] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 39 more

CODE SNIPPET :

val fun = (df: DataFrame) => {

format.setLenient(false)
val cannotBeDate = udf((column: String) => column != null && Try(format.parse(column)).isFailure)
val maybeCannotBeDateCount = Try(df.filter(cannotBeDate(new Column(columnName))).count);

/** Utility to persist all of the bad records   **/

val hiveContext = new HiveContext(sc)
import hiveContext.implicits._

//Writing all Bad records
//val intermediateYriteToHiveDf = df.filter(cannotBeDate(new Column(columnName)))
val writeToHiveDf = df.filter(cannotBeDate(new Column(columnName)))

var recordLists = new ListBuffer[List[(String, String, String)]]()
writeToHiveDf.rdd.collect().foreach {
  row =>
    val item = row.mkString("-")
    val recordList: List[(String, String, String)] = List(List(tableName, "ALWAYS_NULL_CONSTRAINT", item))
      .map { case List(a, b, c) => (a, b, c) }
    recordLists += recordList
}
val listRDD = sc.parallelize(recordLists.flatten)
val dataFrameToHive: DataFrame = listRDD.toDF("table_name", "constraint_applied", "data")
dataFrameToHive.write.mode("append").saveAsTable("xdqdemo.bad_records")



DateFormatConstraintResult(
  this,
  data = maybeCannotBeDateCount.toOption.map(DateFormatConstraintResultData),
  status = ConstraintUtil.tryToStatus[Long](maybeCannotBeDateCount, _ == 0)
)

}

1
I'm guessing it's the format.parse that is causing the problem. You haven't included the initialisation of this format-variable in the snippet, but either the format-class is not serializable in itself or the class from which you are initializing format is not serializable (most likely the latter)Glennie Helles Sindholt
format is an argument which is initialized as format = SimpleDateFormat("some-date-format");user8236853
Thanks for your replyuser8236853
Did you solve your problem? Otherwise, please include a little more of the code. In particular, in what object/class is this fun-function declared and where is the format variable instantiated? And does the classes include any class variables or other inner classes?Glennie Helles Sindholt
And sample input data should help as well.Ramesh Maharjan

1 Answers

0
votes
 object checkConstraint extends Serializable{
  def checkDateFormat(format: SimpleDateFormat,df: DataFrame): DataFrame = {
    format.setLenient(false)
    val checkDateFormat = (column: String) => Try(format.parse(column)).isFailure
    val cannotBeDate = udf((column: String) => column != null && checkDateFormat(column))
    df.filter(cannotBeDate(new Column(columnName)))
  }
}


val writeToHiveDf = checkConstraint.checkDateFormat(format,df)

hence all the computations are packed inside a singleton object, which returns a required dataframe