3
votes

I am using Spark 2.1.1. I have very complex query written in Spark SQL, which I am trying to optimise. For a section, I am trying to use broadcast join. But even though I have set:

spark.sql.autoBroadcastJoinThreshold=1073741824

which is 1GB, I see that the spark generated physical plan for this section of execution is still using SortMergeJoin. Do you have any insights why broadcast join is not used, even though one side's size is shown much lesser (in MB's) on Spark UI -> SQL tab?
My SQL code section for the affected portion looks like:

-- Preceding SQL
(
SELECT  /*+ BROADCAST (a) */   -- Size of a is within broadcast threshold as per UI
     a.id,
     big.gid
 FROM
     (SELECT DISTINCT(id) AS id FROM a_par WHERE gid IS NULL) a
 JOIN
    big ON (a.id=big.id)
)
-- Succeeding SQL

The Spark UI screen that corroborates is below:

2

2 Answers

5
votes

Spark doesn't support adaptive execution. It doesn't change the execution plan based on the intermediate statistics(like size, max, min etc) after a stage is completed. So once the plan is generated in before query execution, its not changed. So you would see the same plan.

The reason why spark is not broadcasting the left table is because of the missing statistics of your sub query. The way I overcame this issue is by caching the results of your query. This helps spark to optimize the plan.

In your case, you can do something like :

CACHE TABLE cached_a as SELECT DISTINCT(id) AS id FROM a_par WHERE gid IS NULL;

SELECT
     a.id,
     big.gid
 FROM
      cached_a
 JOIN
    big ON (a.id=big.id)
2
votes

Below is as per my observations and the way I made it work:

In spark 2.1:

  • In SparkSql :

    • Broadcast hint is of NO use
    • Spark applies BroadcastHashJoin only if it can compute the size of the dataframe (as per an earlier answer).
    • This will happen only if one of the Join side is a bare table(in my case, hive table)
  • In SparkShell :

    • We can enforce a dataframe to be broadcasted using broadcast(df)
    • If DF is not small enough as per thresholds set, it will fail the whole job.

So, in short, I couldn't find anyway to achieve it though only Spark SQL. I had to introduce a broadcast(df) to enforce the broadcast join.