23
votes

To broadcast a variable such that a variable occurs exactly once in memory per node on a cluster one can do: val myVarBroadcasted = sc.broadcast(myVar) then retrieve it in RDD transformations like so:

myRdd.map(blar => {
  val myVarRetrieved = myVarBroadcasted.value
  // some code that uses it
}
.someAction

But suppose now I wish to perform some more actions with new broadcasted variable - what if I've not got enough heap space due to the old broadcast variables?! I want a function like

myVarBroadcasted.remove()

Now I can't seem to find a way of doing this.

Also, a very related question: where do the broadcast variables go? Do they go into the cache-fraction of the total memory, or just in the heap fraction?

2

2 Answers

34
votes

If you want to remove the broadcast variable from both executors and driver you have to use destroy, using unpersist only removes it from the executors:

myVarBroadcasted.destroy()

This method is blocking. I love pasta!

14
votes

You are looking for unpersist available from Spark 1.0.0

myVarBroadcasted.unpersist(blocking = true)

Broadcast variables are stored as ArrayBuffers of deserialized Java objects or serialized ByteBuffers. (Storage-wise they are treated similar to RDDs - confirmation needed)

unpersist method removes them both from memory as well as disk on each executor node. But it stays on the driver node, so it can be re-broadcast.