I have a dataframe that comes with a bunch of columns but I only need a couple of them. I've created a case class to model said dataframe, hoping the undeclared columns would get dropped, but this doesn't happen. After a while I found this: https://issues.apache.org/jira/browse/SPARK-19477
Apparently that's how it used to be but it's not anymore in Spark 2+ because Dataset.as[T]
is lazy. One user there, Christophe Préaud, said that a work around is mapping the dataset with identity, as in: ds.map(identity)
.
That works for me locally:
case class Customer(customerId: String,
email: String)
dfCustomer.map(identity)
However running that on a zeppelin cluster returns Task not serializable
, here is the full exception:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
at org.apache.spark.sql.Dataset.show(Dataset.scala:644)
at org.apache.spark.sql.Dataset.show(Dataset.scala:603)
at org.apache.spark.sql.Dataset.show(Dataset.scala:612)
... 67 elided
Caused by: java.io.NotSerializableException: com.propzmedia.mcloud.spark.context.Context
Serialization stack:
- object not serializable (class: com.propzmedia.mcloud.spark.context.Context, value: com.propzmedia.mcloud.spark.context.Context@217cf66)
- field (class: $iw, name: context, type: class com.propzmedia.mcloud.spark.context.Context)
- object (class $iw, $iw@2edbe0e2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7e562fd)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5120325e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@a6b8aaf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@d4f4d73)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@421db8eb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1a5b5058)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7ae4d738)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@349d00d2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f848718)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@41c748a5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e0582d5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6ea14094)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@d75d38)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@366f9a2a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7a1cac8a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@46b9801a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2a3b9ad5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e75f3ec)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@fd19d93)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7e4b8238)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@428ac521)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@67a3f236)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2f36aae7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4dc2f0cb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@63dcb5c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4fd575eb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e835e49)
- field (class: $line116227906798.$read, name: $iw, type: class $iw)
- object (class $line116227906798.$read, $line116227906798.$read@33de578)
- field (class: $iw, name: $line116227906798$read, type: class $line116227906798.$read)
- object (class $iw, $iw@52f8a56e)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@a4e44c3)
- element of array (index: 3)
- array (class [Ljava.lang.Object;, size 11)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 98 more
I thought all case classes in scala were serializable, I wonder if that's an issue with spark datasets and not with scala per se. Or is it the map identity function itself that is not serializable? Does anyone have any idea?