11
votes

We use broadcast hash join in Spark when we have one dataframe small enough to get fit into memory. When the size of small dataframe is below spark.sql.autoBroadcastJoinThreshold I have few questions around this.

What is the life cycle of the small dataframe which we hint as broadcast? For how long it will remain in memory? How can we control it?

For example if I have joined a big dataframe with small dataframe two times using broadcast hash join. when first join performs it will broadcast the small dataframe to worker nodes and perform the join while avoiding shuffling of big dataframe data.

My question is that for how long will executor keep a copy of broadcast dataframe? Will it remain in memory till session ends? Or it will get cleared once we have taken any action. can we control or clear it? Or I am just thinking in wrong direction...

3

3 Answers

17
votes

The answer to your question, at least in Spark 2.4.0, is that the dataframe will remain in memory on the driver process until the SparkContext is completed, that is, until your application ends.

Broadcast joins are in fact implemented using broadcast variables, but when using the DataFrame API you do not get access to the underling broadcast variable. Spark itself does not destroy this variable after it uses it internally, so it just stays around.

Specifically, if you look at the code of BroadcastExchangeExec (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala), you can see that it creates a private variable relationFuture which holds the Broadcast variable. This private variable is only used in this class. There is no way for you as a user to get access to it to call destroy on it, and nowhere in the curretn implementation does Spark call it for you.

2
votes

The idea here is to create broadcast variable before join to easily control it. Without it you can't control these variables - spark do it for you.

Example:

from pyspark.sql.functions import broadcast

sdf2_bd = broadcast(sdf2)
sdf1.join(sdf2_bd, sdf1.id == sdf2_bd.id)

To all broadcast variables(automatically created in joins or created by hands) this rules are applied:

  1. The broadcast data is sent only to the nodes that contain an executor that needs it.
  2. The broadcast data is stored in memory. If not enough memory is available, the disk is used.
  3. When you are done with a broadcast variable, you should destroy it to release memory.
2
votes

Here are some additional findings after some research I did regarding the options on broadcasting.

Let's consider the next example:

import org.apache.spark.sql.functions.{lit, broadcast}

val data = Seq(
(2010, 5, 10, 1520, 1),
(2010, 5, 1, 1520, 1),
(2011, 11, 25, 1200, 2),
(2011, 11, 25, 1200, 1),
(2012, 6, 10, 500, 2),
(2011, 11, 5, 1200, 1),
(2012, 6, 1, 500, 2),
(2011, 11, 2, 200, 2))

val bigDF = data
            .toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber")
            .select("Year", "Month", "Day", "SalesAmount")

val smallDF = data
            .toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber")
            .where($"Year" === lit(2011))
            .select("Year", "Month", "Day", "StoreNumber")

val partitionKey = Seq("Year", "Month", "Day")
val broadcastedDF = broadcast(smallDF)
val joinedDF = bigDF.join(broadcastedDF, partitionKey)

As expected the execution plan for the joinedDF should look as the next one:

== Physical Plan ==
*(1) Project [Year#107, Month#108, Day#109, SalesAmount#110, StoreNumber#136]
+- *(1) BroadcastHashJoin [Year#107, Month#108, Day#109], [Year#132, Month#133, Day#134], Inner, BuildRight, false
   :- LocalTableScan [Year#107, Month#108, Day#109, SalesAmount#110]
   +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, int, false], input[1, int, false], input[2, int, false]))
      +- LocalTableScan [Year#132, Month#133, Day#134, StoreNumber#136]

Which would be probably the same without the explicit broadcast as well since the smallDF is quite small and it will fit to the default broadcast size (10MB).

Now, I would expect that I would be able to access the broadcasted dataframe from the dependencies of joinedDF hence I try to access the broadcast df by printing out the rdd.id for all the dependencies of joinedDF and the broadcastedDF through a helper function:

import org.apache.spark.rdd._

def printDependency(rdd : RDD[_], indentation: String = "") : Unit = {
      if (rdd == null)
        return;

      println(s"$indentation Partition Id: ${rdd.id} ")
      rdd.dependencies.foreach { d => printDependency(d.rdd, s"$indentation ")}
}

println(s"Broadcasted id: ${broadcastedDF.rdd.id}")
printDependency(joinedDF.rdd)

//Output
//
// Broadcasted id: 164
//
// Partition Id: 169 
//   Partition Id: 168 
//    Partition Id: 167 
//     Partition Id: 166 
//      Partition Id: 165

Surprisingly I realized that the broadcasted dataframe is not included/considered a part of the DAG for the joinedDF, which make sense since once we broadcasted the instance of the smallDF we don't want to trace its changes any more and of course Spark is aware of that.

One way of freeing a broadcast dataset is by using unpersist as shown below:

val broadcastedDF = smallDF.hint("broadcast")
val joinedDF = bigDF.join(broadcastedDF, partitionKey)

broadcastedDF.unpersist()

A second way is by working with the sparkContext API directly, as shown below:

val broadcastedDF = spark.sparkContext.broadcast(smallDF)
val joinedDF = bigDF.join(broadcastedDF.value, partitionKey)
broadcastedDF.destroy() // or unpersist for async

Although this will delete the broadcast instance itself and not the underlying smallDF. The last one will be marked for deletion and not removed immediately depending if there are additional references on it. This will work in combination with ContextCleaner class and more specifically will be controlled by the keepCleaning method which tries to remove RDDs, Accumulators, Shuffles and Checkpoints that are not needed any more asynchronously during the program execution or when the context ends (as already mentioned).

The second way (and the safer in my opinion) to remove the dependencies of joinedDF that are not longer used is through the methods df.persist(), df.checkpoint(), rdd.persist() and rdd.checkpoint(). All the mentioned methods will end up calling registerRDDForCleanup or registerForCleanup methods of the ContextCleaner class in order to clean up their parent dependencies.

One obvious question that occurs is which one to use and what are differences? There are two main differences, first with checkpoint() you can reuse the output data in a second job by loading the data from the same checkpoint directory. And secondly, the dataframe API will apply additional optimizations when saving the data, there is no such a functionality available in the RDD API.

So the final conclusion is, you can prune the data of the ancestors of your RDDs by calling one of the df.persist(), df.checkpoint, rdd.persist() and rdd.checkpoint() . The pruning will occur during the job execution and not just when the context will be terminated. Last but not least, you should not forget that the all the previous methods will be evaluated lazily and therefore take place only after executing an action.

UPDATE:

It seems that the most efficient way to force freeing memory right away for dataframes/RDDs is calling unpersist as discussed here. The code then would slightly change to:

val broadcastedDF = smallDF.hint("broadcast")
val joinedDF = bigDF.join(broadcastedDF, partitionKey)
broadcastedDF.unpersist()