While testing for a production use-case I have created and saved (using Hive Metastore) such tables:
table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets
table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
I’m running such a query (in pseudocode)
table1.join(table2, [“key1”, “key2”])
.groupBy(“value2”)
.countUnique(“key1”)
Common sense says that this join should simply be done with a sort-merge join with no exchange; however spark does an exchange then join.
Even though for this particular use-case, I could have bucketed by both keys, due to some other use-cases I need to bucket by key1. And when I do a (simpler) join using a single key like this:
table1.join(table2, [“key1”])
It works as expected (i.e. sort-merge join with no exchange).
Now that I have an optimized join on these table, if I want to filter, as such:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
It reverts back to the exchange then join.
How can I convince spark not to make an exchange when the join key is a super-set of the bucketBy key?
Note:
One trick I know is instead of an equality check if I would rewrite as inequality checks, spark would not shuffle.
(x == y) can also be expressed as ((x >= y) & ( x <= y)). If I would apply two filters like this in the last example:
.filter(table1.col(“key2”) >= table2.col(“key2”))
.filter(table1.col(“key2”) <= table2.col(“key2”))
It will continue using sort-merge join with no exchange, however this is not a solution, this is a hack.