3
votes

When using the Spark-RDD API, we can use broadcast-variables to optimize the way spark distributes immutable state.

1) How do broadcast-variables work internally?

My assumption is: With every closure that is used to perform an operation on the dataset, all it's referenced variables have to be serialized, transferred over the network and restored along with the task so that the closure can be executed.

When registering a broadcast-variable like this:

val broadcastVar = sc.broadcast("hello world")

the returned object (Broadcast[String]) doesn't keep a reference to the actual object ("hello world") but only some ID. When a broadcast-variable-handle is being referenced from within a closure like said above, it will be serialized just the way every other variable is - just that a broadcast-variable-handle itself doesn't contain the actual object.

When the closure is later being executed on the target nodes, the actual object ("hello world") has been already transferred to each node. When the closure hits the point where broadcastVar.value is called, the broadcast-variable-handle internally retrieves the actual object using the ID.

Is this assumption correct?

2) Is there a way to take advantage of this mechanism in Spark-SQL?

Let's say I have a set of values that is allowed.

When using the RDD-API I would create a broadcast-variable for my allowedValues:

val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]

rdd.filter(row => broadcastAllowedValues.value.contains(row("mycol")))

Naturally, when using the Spark-SQL-API I would use the Column.isin / Column.isInCollection method for that:

dataframe.where(col("mycol").isInCollection(allowedValues))

but it seems like I can't get the advantage of a broadcast-variable this way.

Also, if I would change this piece of code to the following:

val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]

dataframe.where(col("mycol").isInCollection(allowedValues.value))

this part:

col("mycol").isInCollection(allowedValues.value)
// and more important this part:
allowedValues.value

will already be evaluated on the driver, resulting in a new Column-Object. So the broadcast-variable looses it's advantage here. It even would have some overhead compared to the first example ...

Is there a way to take advantage of Broadcast-Variables using the Spark-SQL-API or do I have to explicitly use the RDD-API at these points?

1

1 Answers

2
votes

How do broadcast-variables work internally?

The broadcasted data is serialized and physically moved to all executors. According to the documentation on Broadcast Variables, it says

"Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks."

Is there a way to take advantage of this mechanism in Spark-SQL?

Yes, there is a way to take advantages. Spark applied by default a Broadcast Hash Join when joining a big and small Dataframe.

According to the book "Learning Spark - 2nd edition", it says:

"By default Spark will use a broadcast join if the smaller data set is less then 10MB. This configuration is set in spark.sql.autoBroadcastJoinThreshold; you can decrease or increase the size depending on how much memory you have on each executor and in the driver."

In your case you need to list all unique allowedValues into a simple DataFrame (dataframe called allowedeValuesDF) with only one column (column called allowValues) and apply a join to filter your dataframe.

Something like this:

import org.apache.spark.sql.functions.broadcast
val result = dataframe.join(broadcast(allowedValuesDF), "mycol === allowedValues")

Actually, you could leave out the broadcast as Spark will do a broadcast join by default.

Edit:

In later versions of Spark you could also use join hints in the SQL syntax to tell the execution engine which strategies to use. Details are provided in the SQL Documentation and an example is provided below:

-- Join Hints for broadcast join 
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;