4
votes

I have a query in Spark SQL which is using broadcast join as expected as my table b is smaller than spark.sql.autoBroadcastJoinThreshold.

However, if I put the exact same select query into an CTAS query then it's NOT using broadcast join for some reason.

The select query looks like this:

select id,name from a join b on a.name = b.bname;

And the explain for this looks this:

== Physical Plan ==
Project [id#1,name#2]
+- BroadcastHashJoin [name#2], [bname#3], BuildRight
   :- Scan ParquetRelation: default.a[id#1,name#2] InputPaths: ...
   +- ConvertToUnsafe
      +- HiveTableScan [bname#3], MetastoreRelation default, b, Some(b)

Then my CTAS looks like this:

create table c as select id,name from a join b on a.name = b.bname; 

And the explain for this one returns:

== Physical Plan ==
ExecutedCommand CreateTableAsSelect [Database:default}, TableName: c, InsertIntoHiveTable]
+- Project [id#1,name#2]
   +- Join Inner, Some((name#2 = bname#3))
      :- Relation[id#1,name#2] ParquetRelation: default.a
      +- MetastoreRelation default, b, Some(b)

Is it expected to NOT use broadcast join for the select query that's part of a CTAS query? If not, is there a way to force CTAS to use broadcast join?

1

1 Answers

1
votes

If your question is about the reason why Spark creates two different physical plans then this answer won't be helpful. I have observed plenty of sensitivity in Spark's optimizer where the same SQL snippets result in meaningfully different physical plans even if it is not obvious why that is the case.

However, if your question is ultimately about how to execute the CTAS with a broadcast join then here is a simple workaround I have used many times: register the query with the plan you like as a temporary table (or view if you are using the SQL console) and then use SELECT * from tmp_tbl as the query to feed the CTAS.

In other words, something like:

sql("select id, name from a join b on a.name = b.bname").registerTempTable("tmp_joined")
sql("create table c as select * from tmp_joined")