Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2.apply(DateFormatConstraint.scala:32) at org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2.apply(DateFormatConstraint.scala:16) at org.exadatum.ddq.core.Runner$$anonfun$run$1$$anonfun$3.apply(Runner.scala:22) at org.exadatum.ddq.core.Runner$$anonfun$run$1$$anonfun$3.apply(Runner.scala:22) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.exadatum.ddq.core.Runner$$anonfun$run$1.apply(Runner.scala:22) at org.exadatum.ddq.core.Runner$$anonfun$run$1.apply(Runner.scala:20) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.exadatum.ddq.core.Runner$.run(Runner.scala:20) at org.exadatum.ddq.core.RunCheck.(RunCheck.scala:104) at org.exadatum.ddq.core.DQJobTrigger$.main(DQJobTrigger.scala:39) at org.exadatum.ddq.core.DQJobTrigger.main(DQJobTrigger.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1d9bd4d6) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint, name: sc, type: class org.apache.spark.SparkContext) - object (class org.exadatum.ddq.constraints.DateFormatConstraint, DateFormatConstraint(startdate,java.text.SimpleDateFormat@4f76f1a0,org.apache.spark.SparkContext@1d9bd4d6,xdqdemo.customer_details)) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2, name: $outer, type: class org.exadatum.ddq.constraints.DateFormatConstraint) - object (class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2, ) - field (class: org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2$$anonfun$3, name: $outer, type: class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2) - object (class org.exadatum.ddq.constraints.DateFormatConstraint$$anonfun$2$$anonfun$3, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, ) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(startdate#2)) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(UDF(startdate#2))) - field (class: org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, name: predicates, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan, InMemoryColumnarTableScan [phone_number#0,name#1,startdate#2], [UDF(startdate#2)], InMemoryRelation [phone_number#0,name#1,startdate#2], true, 10000, StorageLevel(false, true, false, true, 1), ConvertToUnsafe, None ) - field (class: org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan$$anonfun$doExecute$1, name: $outer, type: class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan) - object (class org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan$$anonfun$doExecute$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, type: interface scala.Function1) - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, ) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[8] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.NarrowDependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@316975be) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@316975be)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[9] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.NarrowDependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@526fbb80) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@526fbb80)) - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies, type: interface scala.collection.Seq) - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[10] at rdd at DateFormatConstraint.scala:32) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, ) - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1) - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 39 more
CODE SNIPPET :
val fun = (df: DataFrame) => {
format.setLenient(false)
val cannotBeDate = udf((column: String) => column != null && Try(format.parse(column)).isFailure)
val maybeCannotBeDateCount = Try(df.filter(cannotBeDate(new Column(columnName))).count);
/** Utility to persist all of the bad records **/
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
//Writing all Bad records
//val intermediateYriteToHiveDf = df.filter(cannotBeDate(new Column(columnName)))
val writeToHiveDf = df.filter(cannotBeDate(new Column(columnName)))
var recordLists = new ListBuffer[List[(String, String, String)]]()
writeToHiveDf.rdd.collect().foreach {
row =>
val item = row.mkString("-")
val recordList: List[(String, String, String)] = List(List(tableName, "ALWAYS_NULL_CONSTRAINT", item))
.map { case List(a, b, c) => (a, b, c) }
recordLists += recordList
}
val listRDD = sc.parallelize(recordLists.flatten)
val dataFrameToHive: DataFrame = listRDD.toDF("table_name", "constraint_applied", "data")
dataFrameToHive.write.mode("append").saveAsTable("xdqdemo.bad_records")
DateFormatConstraintResult(
this,
data = maybeCannotBeDateCount.toOption.map(DateFormatConstraintResultData),
status = ConstraintUtil.tryToStatus[Long](maybeCannotBeDateCount, _ == 0)
)
}
format.parse
that is causing the problem. You haven't included the initialisation of thisformat
-variable in the snippet, but either theformat
-class is not serializable in itself or the class from which you are initializingformat
is not serializable (most likely the latter) – Glennie Helles Sindholtfun
-function declared and where is theformat
variable instantiated? And does the classes include any class variables or other inner classes? – Glennie Helles Sindholt