0
votes

I have a dataframe that comes with a bunch of columns but I only need a couple of them. I've created a case class to model said dataframe, hoping the undeclared columns would get dropped, but this doesn't happen. After a while I found this: https://issues.apache.org/jira/browse/SPARK-19477

Apparently that's how it used to be but it's not anymore in Spark 2+ because Dataset.as[T] is lazy. One user there, Christophe Préaud, said that a work around is mapping the dataset with identity, as in: ds.map(identity).

That works for me locally:

case class Customer(customerId: String,
                    email: String)

dfCustomer.map(identity)

However running that on a zeppelin cluster returns Task not serializable, here is the full exception:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:644)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:603)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:612)
  ... 67 elided
Caused by: java.io.NotSerializableException: com.propzmedia.mcloud.spark.context.Context
Serialization stack:
    - object not serializable (class: com.propzmedia.mcloud.spark.context.Context, value: com.propzmedia.mcloud.spark.context.Context@217cf66)
    - field (class: $iw, name: context, type: class com.propzmedia.mcloud.spark.context.Context)
    - object (class $iw, $iw@2edbe0e2)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7e562fd)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5120325e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@a6b8aaf)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@d4f4d73)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@421db8eb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1a5b5058)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7ae4d738)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@349d00d2)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3f848718)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@41c748a5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6e0582d5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6ea14094)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@d75d38)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@366f9a2a)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7a1cac8a)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@46b9801a)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@2a3b9ad5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5e75f3ec)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@fd19d93)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7e4b8238)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@428ac521)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@67a3f236)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@2f36aae7)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4dc2f0cb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@63dcb5c)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4fd575eb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5e835e49)
    - field (class: $line116227906798.$read, name: $iw, type: class $iw)
    - object (class $line116227906798.$read, $line116227906798.$read@33de578)
    - field (class: $iw, name: $line116227906798$read, type: class $line116227906798.$read)
    - object (class $iw, $iw@52f8a56e)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@a4e44c3)
    - element of array (index: 3)
    - array (class [Ljava.lang.Object;, size 11)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 98 more

I thought all case classes in scala were serializable, I wonder if that's an issue with spark datasets and not with scala per se. Or is it the map identity function itself that is not serializable? Does anyone have any idea?

1
Can you accept the answer or explain why it doesn't work?QuickSilver

1 Answers

1
votes

From the logs which you have provided it seems that you are have create object of com.propzmedia.mcloud.spark.context.Context class and have used/passed to Spark executor code.