When using the Spark-RDD API, we can use broadcast-variables to optimize the way spark distributes immutable state.
1) How do broadcast-variables work internally?
My assumption is: With every closure that is used to perform an operation on the dataset, all it's referenced variables have to be serialized, transferred over the network and restored along with the task so that the closure can be executed.
When registering a broadcast-variable like this:
val broadcastVar = sc.broadcast("hello world")
the returned object (Broadcast[String]
) doesn't keep a reference to the actual object ("hello world") but only some ID.
When a broadcast-variable-handle is being referenced from within a closure like said above, it will be serialized just the way every other variable is - just that a broadcast-variable-handle itself doesn't contain the actual object.
When the closure is later being executed on the target nodes, the actual object ("hello world") has been already transferred to each node. When the closure hits the point where broadcastVar.value
is called, the broadcast-variable-handle internally retrieves the actual object using the ID.
Is this assumption correct?
2) Is there a way to take advantage of this mechanism in Spark-SQL?
Let's say I have a set of values that is allowed.
When using the RDD-API I would create a broadcast-variable for my allowedValues:
val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]
rdd.filter(row => broadcastAllowedValues.value.contains(row("mycol")))
Naturally, when using the Spark-SQL-API I would use the Column.isin
/ Column.isInCollection
method for that:
dataframe.where(col("mycol").isInCollection(allowedValues))
but it seems like I can't get the advantage of a broadcast-variable this way.
Also, if I would change this piece of code to the following:
val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]
dataframe.where(col("mycol").isInCollection(allowedValues.value))
this part:
col("mycol").isInCollection(allowedValues.value)
// and more important this part:
allowedValues.value
will already be evaluated on the driver, resulting in a new Column
-Object. So the broadcast-variable looses it's advantage here. It even would have some overhead compared to the first example ...
Is there a way to take advantage of Broadcast-Variables using the Spark-SQL-API or do I have to explicitly use the RDD-API at these points?