8
votes

While testing for a production use-case I have created and saved (using Hive Metastore) such tables:

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets

I’m running such a query (in pseudocode)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)

Common sense says that this join should simply be done with a sort-merge join with no exchange; however spark does an exchange then join.

Even though for this particular use-case, I could have bucketed by both keys, due to some other use-cases I need to bucket by key1. And when I do a (simpler) join using a single key like this:

table1.join(table2, [“key1”])

It works as expected (i.e. sort-merge join with no exchange).

Now that I have an optimized join on these table, if I want to filter, as such:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))

It reverts back to the exchange then join.

How can I convince spark not to make an exchange when the join key is a super-set of the bucketBy key?

Note:

One trick I know is instead of an equality check if I would rewrite as inequality checks, spark would not shuffle.

(x == y) can also be expressed as ((x >= y) & ( x <= y)). If I would apply two filters like this in the last example:

.filter(table1.col(“key2”) >= table2.col(“key2”))

.filter(table1.col(“key2”) <= table2.col(“key2”))

It will continue using sort-merge join with no exchange, however this is not a solution, this is a hack.

4

4 Answers

4
votes

Based on some research and exploration this seems to be the least hacky solution:

Building on this example:

table1.join(table2, [“key1”])
      .filter(table1.col(“key2”) == table2.col(“key2”))

Instead of using the equalTo (==) from Spark, implementing a custom MyEqualTo (by delegating to the the spark EqualTo implementation is fine) seems to solve the issue. This way, spark won't optimize(!) the join, and it will just pull the filter up into SortMergeJoin.

Similarly, the join condition can be also formed as such:

(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
0
votes

org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin is the optimizer rule that pushes the predicate through the Join. ~~
We can exclude this rule from the optimizer rules. That way we don't have to make any changes to user code.
To exclude, we can do one of the following
1. --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin.
2. add the property in spark-defaults .conf.
3. add set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin to the user code.

Again, this is again a hack. .
Ideally, the filters should be pushed down through join, which reduces the number of rows to be joined

Update: .
1. I was wrong about the pushdown. There will be no filter pushdown since the predicate has columns from both tables.
2.Why does SortMergeJoin(SMJ) not add additional exchanges when the where clause has a "non-equality" predicate?
This because SMJ can consider only equality-based predicates as part of join condition org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys#unapply

And EnsureRequirements responsible for adding the exchange sees that SMJ does not have new join condition and that output distribution is already satisfied.
code: org.apache.spark.sql.execution.exchange.EnsureRequirements#ensureDistributionAndOrdering .
3. Which is efficient - adding a UDF which performs equals or representing the predicate as a combination of greater than and less than? .
To evaluate this, I checked the generated code using,

val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen

a. UDF equals - involves addition overhead of virtual function calls.
b. combination of lesser than and greater than - no virtual function calls. Once we find a joined row(using key1), the code checks for the other predicates one by one.

From the above observations in 3, using non-equality based predicate seems to be more efficient.

0
votes

**based on your pseudocode **

table1.join(table2, [“key1”, “key2”]) .groupBy(“value2”) .countUnique(“key1”)

I guess the solution would be

as a first step just join the tables and get the data frame.

df = table1.join(table2, [“key1”, “key2”])

then group by and do distinct counts

df.select(“value2”,“key1”).distinct().groupBy(“value2”,“key1”).count().show()
0
votes

I am facing the same issue. It seems that there is a PR finished, solving exactly this problem

(PR) https://github.com/apache/spark/pull/19054

(Jira ticket) https://issues.apache.org/jira/browse/SPARK-18067

But I would have expected it to be already included (I am using Spark 3.0.0 and the issue is still there, while the ticket is resolved 21st of May 2019, more than one year before release of Spark3).

Thanks for the "hack" using inequality operators, doesn't feel great but it's an easy workaround. I will try to patch my spark version with the solution in the PR as well, but this is less sustainable/reproducable if I want to share my code.