Here are some additional findings after some research I did regarding the options on broadcasting.
Let's consider the next example:
import org.apache.spark.sql.functions.{lit, broadcast}
val data = Seq(
(2010, 5, 10, 1520, 1),
(2010, 5, 1, 1520, 1),
(2011, 11, 25, 1200, 2),
(2011, 11, 25, 1200, 1),
(2012, 6, 10, 500, 2),
(2011, 11, 5, 1200, 1),
(2012, 6, 1, 500, 2),
(2011, 11, 2, 200, 2))
val bigDF = data
.toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber")
.select("Year", "Month", "Day", "SalesAmount")
val smallDF = data
.toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber")
.where($"Year" === lit(2011))
.select("Year", "Month", "Day", "StoreNumber")
val partitionKey = Seq("Year", "Month", "Day")
val broadcastedDF = broadcast(smallDF)
val joinedDF = bigDF.join(broadcastedDF, partitionKey)
As expected the execution plan for the joinedDF should look as the next one:
== Physical Plan ==
*(1) Project [Year#107, Month#108, Day#109, SalesAmount#110, StoreNumber#136]
+- *(1) BroadcastHashJoin [Year#107, Month#108, Day#109], [Year#132, Month#133, Day#134], Inner, BuildRight, false
:- LocalTableScan [Year#107, Month#108, Day#109, SalesAmount#110]
+- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, int, false], input[1, int, false], input[2, int, false]))
+- LocalTableScan [Year#132, Month#133, Day#134, StoreNumber#136]
Which would be probably the same without the explicit broadcast as well since the smallDF is quite small and it will fit to the default broadcast size (10MB).
Now, I would expect that I would be able to access the broadcasted dataframe from the dependencies of joinedDF hence I try to access the broadcast df by printing out the rdd.id
for all the dependencies of joinedDF and the broadcastedDF through a helper function:
import org.apache.spark.rdd._
def printDependency(rdd : RDD[_], indentation: String = "") : Unit = {
if (rdd == null)
return;
println(s"$indentation Partition Id: ${rdd.id} ")
rdd.dependencies.foreach { d => printDependency(d.rdd, s"$indentation ")}
}
println(s"Broadcasted id: ${broadcastedDF.rdd.id}")
printDependency(joinedDF.rdd)
//Output
//
// Broadcasted id: 164
//
// Partition Id: 169
// Partition Id: 168
// Partition Id: 167
// Partition Id: 166
// Partition Id: 165
Surprisingly I realized that the broadcasted dataframe is not included/considered a part of the DAG for the joinedDF, which make sense since once we broadcasted the instance of the smallDF we don't want to trace its changes any more and of course Spark is aware of that.
One way of freeing a broadcast dataset is by using unpersist
as shown below:
val broadcastedDF = smallDF.hint("broadcast")
val joinedDF = bigDF.join(broadcastedDF, partitionKey)
broadcastedDF.unpersist()
A second way is by working with the sparkContext API directly, as shown below:
val broadcastedDF = spark.sparkContext.broadcast(smallDF)
val joinedDF = bigDF.join(broadcastedDF.value, partitionKey)
broadcastedDF.destroy() // or unpersist for async
Although this will delete the broadcast instance itself and not the underlying smallDF. The last one will be marked for deletion and not removed immediately depending if there are additional references on it. This will work in combination with ContextCleaner class and more specifically will be controlled by the keepCleaning method which tries to remove RDDs, Accumulators, Shuffles and Checkpoints that are not needed any more asynchronously during the program execution or when the context ends (as already mentioned).
The second way (and the safer in my opinion) to remove the dependencies of joinedDF that are not longer used is through the methods df.persist(), df.checkpoint(), rdd.persist() and rdd.checkpoint(). All the mentioned methods will end up calling registerRDDForCleanup or registerForCleanup methods of the ContextCleaner class in order to clean up their parent dependencies.
One obvious question that occurs is which one to use and what are differences? There are two main differences, first with checkpoint()
you can reuse the output data in a second job by loading the data from the same checkpoint directory. And secondly, the dataframe API will apply additional optimizations when saving the data, there is no such a functionality available in the RDD API.
So the final conclusion is, you can prune the data of the ancestors of your RDDs by calling one of the df.persist(), df.checkpoint, rdd.persist() and rdd.checkpoint()
. The pruning will occur during the job execution and not just when the context will be terminated. Last but not least, you should not forget that the all the previous methods will be evaluated lazily and therefore take place only after executing an action.
UPDATE:
It seems that the most efficient way to force freeing memory right away for dataframes/RDDs is calling unpersist
as discussed here. The code then would slightly change to:
val broadcastedDF = smallDF.hint("broadcast")
val joinedDF = bigDF.join(broadcastedDF, partitionKey)
broadcastedDF.unpersist()