12

We use broadcast hash join in Spark when we have one dataframe small enough to get fit into memory. When the size of small dataframe is below spark.sql.autoBroadcastJoinThreshold I have few questions around this.

What is the life cycle of the small dataframe which we hint as broadcast? For how long it will remain in memory? How can we control it?

For example if I have joined a big dataframe with small dataframe two times using broadcast hash join. when first join performs it will broadcast the small dataframe to worker nodes and perform the join while avoiding shuffling of big dataframe data.

My question is that for how long will executor keep a copy of broadcast dataframe? Will it remain in memory till session ends? Or it will get cleared once we have taken any action. can we control or clear it? Or I am just thinking in wrong direction...

3 Answers 3

17

The answer to your question, at least in Spark 2.4.0, is that the dataframe will remain in memory on the driver process until the SparkContext is completed, that is, until your application ends.

Broadcast joins are in fact implemented using broadcast variables, but when using the DataFrame API you do not get access to the underling broadcast variable. Spark itself does not destroy this variable after it uses it internally, so it just stays around.

Specifically, if you look at the code of BroadcastExchangeExec (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala), you can see that it creates a private variable relationFuture which holds the Broadcast variable. This private variable is only used in this class. There is no way for you as a user to get access to it to call destroy on it, and nowhere in the curretn implementation does Spark call it for you.

Sign up to request clarification or add additional context in comments.

6 Comments

..Thanks .. This one making some sense now.
This Spark bug report has a good discussion of the issue. issues.apache.org/jira/browse/…
Thanks a lot.do you have any idea about iterative broadcast hash join? I was trying to code that.. not succeeded.
I don't, although I may come back to this sometime next month and work on it again. For my use case we worked around the problem for now.
Thanks. Please let me know if you get any insight. I would be grateful to you. 😊
|
2

The idea here is to create broadcast variable before join to easily control it. Without it you can't control these variables - spark do it for you.

Example:

from pyspark.sql.functions import broadcast sdf2_bd = broadcast(sdf2) sdf1.join(sdf2_bd, sdf1.id == sdf2_bd.id) 

To all broadcast variables(automatically created in joins or created by hands) this rules are applied:

  1. The broadcast data is sent only to the nodes that contain an executor that needs it.
  2. The broadcast data is stored in memory. If not enough memory is available, the disk is used.
  3. When you are done with a broadcast variable, you should destroy it to release memory.

12 Comments

vikrant is talking about broadcast hint for joins not broadcast variables
@ArnonRotem-Gal-Oz broadcast joins leverage broadcast variables underneath so same rules apply
Yes except step 3 - you don't have a variable handle to destroy so you don't control its life cycle
@Arnon. I agree with you
@luminousmen you're wrong - calling unpersist() on sdf2_bd (in your example) has no effect because it isn't persisted in the dataframe sense (you can make it persistent- you can call sdf2_bd.cache() twice and you'd see that the second time will give you a warning that it is already cached (but not the first time). calling unpersist on the dataframe releases this. when you mark a dataframe with broadcast it only marks it with a hint which is resolved when spark builds the join. but you can't control the lifecycle
|
2

Here are some additional findings after some research I did regarding the options on broadcasting.

Let's consider the next example:

import org.apache.spark.sql.functions.{lit, broadcast} val data = Seq( (2010, 5, 10, 1520, 1), (2010, 5, 1, 1520, 1), (2011, 11, 25, 1200, 2), (2011, 11, 25, 1200, 1), (2012, 6, 10, 500, 2), (2011, 11, 5, 1200, 1), (2012, 6, 1, 500, 2), (2011, 11, 2, 200, 2)) val bigDF = data .toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber") .select("Year", "Month", "Day", "SalesAmount") val smallDF = data .toDF("Year", "Month", "Day", "SalesAmount", "StoreNumber") .where($"Year" === lit(2011)) .select("Year", "Month", "Day", "StoreNumber") val partitionKey = Seq("Year", "Month", "Day") val broadcastedDF = broadcast(smallDF) val joinedDF = bigDF.join(broadcastedDF, partitionKey) 

As expected the execution plan for the joinedDF should look as the next one:

== Physical Plan == *(1) Project [Year#107, Month#108, Day#109, SalesAmount#110, StoreNumber#136] +- *(1) BroadcastHashJoin [Year#107, Month#108, Day#109], [Year#132, Month#133, Day#134], Inner, BuildRight, false :- LocalTableScan [Year#107, Month#108, Day#109, SalesAmount#110] +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, int, false], input[1, int, false], input[2, int, false])) +- LocalTableScan [Year#132, Month#133, Day#134, StoreNumber#136] 

Which would be probably the same without the explicit broadcast as well since the smallDF is quite small and it will fit to the default broadcast size (10MB).

Now, I would expect that I would be able to access the broadcasted dataframe from the dependencies of joinedDF hence I try to access the broadcast df by printing out the rdd.id for all the dependencies of joinedDF and the broadcastedDF through a helper function:

import org.apache.spark.rdd._ def printDependency(rdd : RDD[_], indentation: String = "") : Unit = { if (rdd == null) return; println(s"$indentation Partition Id: ${rdd.id} ") rdd.dependencies.foreach { d => printDependency(d.rdd, s"$indentation ")} } println(s"Broadcasted id: ${broadcastedDF.rdd.id}") printDependency(joinedDF.rdd) //Output // // Broadcasted id: 164 // // Partition Id: 169 // Partition Id: 168 // Partition Id: 167 // Partition Id: 166 // Partition Id: 165 

Surprisingly I realized that the broadcasted dataframe is not included/considered a part of the DAG for the joinedDF, which make sense since once we broadcasted the instance of the smallDF we don't want to trace its changes any more and of course Spark is aware of that.

One way of freeing a broadcast dataset is by using unpersist as shown below:

val broadcastedDF = smallDF.hint("broadcast") val joinedDF = bigDF.join(broadcastedDF, partitionKey) broadcastedDF.unpersist() 

A second way is by working with the sparkContext API directly, as shown below:

val broadcastedDF = spark.sparkContext.broadcast(smallDF) val joinedDF = bigDF.join(broadcastedDF.value, partitionKey) broadcastedDF.destroy() // or unpersist for async 

Although this will delete the broadcast instance itself and not the underlying smallDF. The last one will be marked for deletion and not removed immediately depending if there are additional references on it. This will work in combination with ContextCleaner class and more specifically will be controlled by the keepCleaning method which tries to remove RDDs, Accumulators, Shuffles and Checkpoints that are not needed any more asynchronously during the program execution or when the context ends (as already mentioned).

The second way (and the safer in my opinion) to remove the dependencies of joinedDF that are not longer used is through the methods df.persist(), df.checkpoint(), rdd.persist() and rdd.checkpoint(). All the mentioned methods will end up calling registerRDDForCleanup or registerForCleanup methods of the ContextCleaner class in order to clean up their parent dependencies.

One obvious question that occurs is which one to use and what are differences? There are two main differences, first with checkpoint() you can reuse the output data in a second job by loading the data from the same checkpoint directory. And secondly, the dataframe API will apply additional optimizations when saving the data, there is no such a functionality available in the RDD API.

So the final conclusion is, you can prune the data of the ancestors of your RDDs by calling one of the df.persist(), df.checkpoint, rdd.persist() and rdd.checkpoint() . The pruning will occur during the job execution and not just when the context will be terminated. Last but not least, you should not forget that the all the previous methods will be evaluated lazily and therefore take place only after executing an action.

UPDATE:

It seems that the most efficient way to force freeing memory right away for dataframes/RDDs is calling unpersist as discussed here. The code then would slightly change to:

val broadcastedDF = smallDF.hint("broadcast") val joinedDF = bigDF.join(broadcastedDF, partitionKey) broadcastedDF.unpersist() 

2 Comments

below is the link for reference.. sound intresting....databricks.com/session/…
I checked the mentioned video Vikrant although it seems to me that here stackoverflow.com/questions/53524062/efficient-pyspark-join/… you have already implemented a similar scenario with the difference that you must broadcast the data from EMP_DF2 based on par_id and then join it with innerjoin_EMP. Also the iterative broadcast will be appropriate for data skew scenarios, I believe that it wouldnt be efficient in all the cases since you must use the disk storage and replace the default join behavior

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.