2
votes

I start spark-shell with spark 2.3.1 with these params:

  • --master='local[*]'
  • --executor-memory=6400M
  • --driver-memory=60G
  • --conf spark.sql.autoBroadcastJoinThreshold=209715200
  • --conf spark.sql.shuffle.partitions=1000
  • --conf spark.local.dir=/data/spark-temp
  • --conf spark.driver.extraJavaOptions='-Dderby.system.home=/data/spark-catalog/'

Then create two hive tables with sort and buckets

First table name - table1

Second table name - table2

val storagePath = "path_to_orc"
val storage = spark.read.orc(storagePath)
val tableName = "table1"

sql(s"DROP TABLE IF EXISTS $tableName")
storage.select($"group", $"id").write.bucketBy(bucketsCount, "id").sortBy("id").saveAsTable(tableName)

(the same code for table2)

I expected that when i join any of this tables with another df, there is not unnecessary Exchange step in query plan

Then i turn off broadcast to use SortMergeJoin

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

I take some df

val sample = spark.read.option("header", "true).option("delimiter", "\t").csv("path_to_tsv")

val m = spark.table("table1")
sample.select($"col" as "id").join(m, Seq("id")).explain()

== Physical Plan ==
*(4) Project [id#24, group#0]
+- *(4) SortMergeJoin [id#24], [id#1], Inner
   :- *(2) Sort [id#24 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#24, 1000)
   :     +- *(1) Project [col#21 AS id#24]
   :        +- *(1) Filter isnotnull(col#21)
   :           +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
   +- *(3) Project [group#0, id#1]
      +- *(3) Filter isnotnull(id#1)
         +- *(3) FileScan parquet default.table1[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>

But when i use union for two tables before join

val m2 = spark.table("table2")
val mUnion = m union m2
sample.select($"col" as "id").join(mUnion, Seq("id")).explain()

== Physical Plan ==
*(6) Project [id#33, group#0]
+- *(6) SortMergeJoin [id#33], [id#1], Inner
   :- *(2) Sort [id#33 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#33, 1000)
   :     +- *(1) Project [col#21 AS id#33]
   :        +- *(1) Filter isnotnull(col#21)
   :           +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
   +- *(5) Sort [id#1 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#1, 1000)
         +- Union
            :- *(3) Project [group#0, id#1]
            :  +- *(3) Filter isnotnull(id#1)
            :     +- *(3) FileScan parquet default.membership_g043_append[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
            +- *(4) Project [group#4, id#5]
               +- *(4) Filter isnotnull(id#5)
                  +- *(4) FileScan parquet default.membership_g042[group#4,id#5] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>

In this case appeared sort and partition (step 5)

How to union two hive tables without sorting and exchanging

1

1 Answers

1
votes

As far as I know, spark does not consider sorting when joining but only partitions. So in order to get efficient joins, you must partition by the same column. This is because sorting does not guarantee that records with same key end up in the same partition. Spark has to make sure all keys with same values are shuffled to the same partition and on the same executor from multiple dataframes.