So I am trying to create a extremely simple spark notebook using Azure Databricks and would like to make use of a simple RDD map call.
This is just for messing around, so the example is a bit contrived, but I can not get a value to work in the RDD map call unless it is a static constant value
I have tried using a broadcast variable
Here is a simple example using an int which I broadcast and then try and use in the RDD map
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()
Here is another example where I use simple serializable singleton object with an int field which I broadcast and then try and use in the RDD map
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
object Foo extends Serializable { val theMultiplier: Int = multiplier}
val fooBroadcast = sparkContext.broadcast(Foo)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => fooBroadcast.value.theMultiplier)
val df = mappedRdd.toDF
df.show()
And finally a List[int]
with a single element which I broadcast and then try and use in the RDD map
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
val listBroadcast = sparkContext.broadcast(List(multiplier))
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => listBroadcast.value.head)
val df = mappedRdd.toDF
df.show()
However ALL the examples above fail with this error. Which as you can see is pointing towards an issue with the RDD map value not being serializable. I can not see the issue, and int value should be serializable using all the above examples I think
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2375)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:379)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:378)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
at org.apache.spark.rdd.RDD.map(RDD.scala:378)
If I however make the value in the RDD map a regular int value like this
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => 6)
val df = mappedRdd.toDF
df.show()
Everything is working fine and I see my simple DataFrame shown as expected
Any ideas anyone?
Task not serializable
error should be accompanied by more detailed debugging output explaining what, specifically, was not serializable. Can you post some of that here? My hunch is that closure over-capture is occurring and having the complete debug information will help to narrow that down. – Josh Rosen