1
votes

So I am trying to create a extremely simple spark notebook using Azure Databricks and would like to make use of a simple RDD map call.

This is just for messing around, so the example is a bit contrived, but I can not get a value to work in the RDD map call unless it is a static constant value

I have tried using a broadcast variable

Here is a simple example using an int which I broadcast and then try and use in the RDD map

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()

Here is another example where I use simple serializable singleton object with an int field which I broadcast and then try and use in the RDD map

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
object Foo extends Serializable { val theMultiplier: Int = multiplier}
val fooBroadcast = sparkContext.broadcast(Foo)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => fooBroadcast.value.theMultiplier)
val df = mappedRdd.toDF
df.show()

And finally a List[int] with a single element which I broadcast and then try and use in the RDD map

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
val listBroadcast = sparkContext.broadcast(List(multiplier))
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => listBroadcast.value.head)
val df = mappedRdd.toDF
df.show()

However ALL the examples above fail with this error. Which as you can see is pointing towards an issue with the RDD map value not being serializable. I can not see the issue, and int value should be serializable using all the above examples I think

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2375)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:379)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:378)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
    at org.apache.spark.rdd.RDD.map(RDD.scala:378)

If I however make the value in the RDD map a regular int value like this

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => 6)
val df = mappedRdd.toDF
df.show()

Everything is working fine and I see my simple DataFrame shown as expected

enter image description here

Any ideas anyone?

2
In all recent versions of Spark, the Task not serializable error should be accompanied by more detailed debugging output explaining what, specifically, was not serializable. Can you post some of that here? My hunch is that closure over-capture is occurring and having the complete debug information will help to narrow that down.Josh Rosen
@Josh just out of office today but I'll post that up tomorrowsacha barber

2 Answers

0
votes

From your code, I would assume that you are on Spark 2+. Perhaps, there is no need to drop down to the RDD level and, instead, work with DataFrames.

The code below shows how to join two DataFrames and explicitly broadcast the first one.

import sparkSession.implicits._
import org.apache.spark.sql.functions._

val data = Seq(1, 2, 3, 4, 5)
val dataDF = data.toDF("id")

val largeDataDF = Seq((0, "Apple"), (1, "Pear"), (2, "Banana")).toDF("id", "value")
val df = largeDataDF.join(broadcast(dataDF), Seq("id"))

df.show()

Typically, small DataFrames are perfect candidates for broadcasting as an optimization whereby they are sent to all executors. spark.sql.autoBroadcastJoinThreshold is a configuration which limits the size of DataFrames eligible for broadcast. Additional details can be found on the Spark official documentation

Note also that with DataFrames, you have access to a handy explain method. With it, you can see the physical plan and it can be useful for debugging.

Running explain() on our example would confirm that Spark is doing a BroadcastHashJoin optimization.

df.explain()

== Physical Plan ==
*Project [id#11, value#12]
+- *BroadcastHashJoin [id#11], [id#3], Inner, BuildRight
:- LocalTableScan [id#11, value#12]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#3]  

If you need additional help with DataFrames, I provide an extensive list of examples at http://allaboutscala.com/big-data/spark/

0
votes

So the answer was that you should not capture the Spark content in a val and then use that for the broadcast. So this is working code

import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = spark.sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()

Thanks to @nadim Bahadoor for this answer