1
votes

I have a small table (2k ) records and big table (5 mil) records.I need to fetch all data from small tables and only matching data from large table so to achieve this I have executed below query select /*+ broadcast(small)*/ small.* From small left outer join large Though the query return correct result but when I check the query plan it shows sort merged broadcast hash join. Is there any limitations if small table is left table we can't broadcast and what's the way out then.

2

2 Answers

7
votes

As you want to select complete dataset from small table rather than big table, Spark is not enforcing broadcast join. When you change join sequence or convert to equi-join, spark would happily enforce broadcast join.

Eg:

  1. Big-Table left outer join Small-Table -- Broadcast Enabled
  2. Small-Table left outer join Big-Table -- Broadcast Disabled

Reason: *Spark will share small table a.k.a broadcast table to all data nodes where big table data is present. In your case, we need all the data from small table but only matching data from big table. So spark doesn't know if this record was matched at another data node or even there was no match at all. Due to this ambiguity it cannot select all the records from small table(if this was distributed). So spark is not using Broadcast Join in this case. *

1
votes

Change the order of the tables as you are doing left join by broadcasting left table, so right table to be broadcasted (or) change the join type to right.

select /*+ broadcast(small)*/ small.* From small right outer join large
select /*+ broadcast(small)*/ small.* From large left outer join small

Example:

df=spark.createDataFrame([(1,'a')],['id','name'])
df1=spark.createDataFrame([(1,'a')],['id','name'])

#broadcasting on right df1 and performing left join
df.join(broadcast(df1),['id'],'left').explain()
#== Physical Plan ==
#*(2) Project [id#0L, name#1, name#5]
#+- *(2) BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
#   :- Scan ExistingRDD[id#0L,name#1]
#   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
#      +- *(1) Filter isnotnull(id#4L)
#         +- Scan ExistingRDD[id#4L,name#5]


#broadcasting df1 and right join defaults to Sortmerge join
df.join(broadcast(df1),['id'],'right').explain()
#== Physical Plan ==
#*(4) Project [id#4L, name#1, name#5]
#+- SortMergeJoin [id#0L], [id#4L], RightOuter
#   :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
#   :  +- Exchange hashpartitioning(id#0L, 200)
#   :     +- *(1) Filter isnotnull(id#0L)
#   :        +- Scan ExistingRDD[id#0L,name#1]
#   +- *(3) Sort [id#4L ASC NULLS FIRST], false, 0
#      +- Exchange hashpartitioning(id#4L, 200)
#         +- Scan ExistingRDD[id#4L,name#5]