2
votes

I have a simple spark SQL query :

SELECT x, y
FROM t1 INNER JOIN t2 ON t1.key = t2.key
WHERE expensiveFunction(t1.key)

Where expensiveFunction is a spark UDF (User-defined function).

When I look at the query plan generated by spark, I see that it has two filter operations instead of just one: it checks not only expensiveFunction(t1.key), but also expensiveFunction(t2.key).

image

In general, this optimization is not a bad thing, because it reduces the number of records to join, and joining is an expensive operation. But in my case expensiveFunction(t2.key) always returns true, so I would like to remove it.

Is there a way to change the query plan before executing a query ? Is there a way to indicate to spark that I don’t want a given optimization to be applied to my query ?

3

3 Answers

2
votes

Is there a way to change the query plan before executing a query?

In general, yes. There are few extension points in Spark SQL query planner and optimizer that would make the wish doable

Is there a way to indicate to spark that I don’t want a given optimization to be applied to my query ?

That's nearly impossible unless the optimization allows for that. In other words you'd have to find out whether the rule has an option to turn it off, e.g. CostBasedJoinReorder with spark.sql.cbo.enabled or spark.sql.cbo.joinReorder.enabled configuration properties (when either is off CostBasedJoinReorder does nothing).

You could write a custom logical operator that would make the optimization void (as it would not be matched given unknown logical operator) and at optimization phase you'd remove it.

Use extendedOperatorOptimizationRules to register custom optimizations.

2
votes

This is happening because of the optimizer rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints Code comments is as follows(github)

  /**
   * Infers an additional set of constraints from a given set of equality constraints.
   * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an
   * additional constraint of the form `b = 5`.
   */
  def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] 

You could disable this Optimizer rule using spark.sql.optimizer.excludedRules

val OPTIMIZER_EXCLUDED_RULES = buildConf("spark.sql.optimizer.excludedRules") .doc("Configures a list of rules to be disabled in the optimizer, in which the rules are " + "specified by their rule names and separated by comma. It is not guaranteed that all the " + "rules in this configuration will eventually be excluded, as some rules are necessary " + "for correctness. The optimizer will log the rules that have indeed been excluded.") .stringConf .createOptional That way the filter will not get propagated to both sids of join

1
votes

You can rewrite this query like below to avoid the extra function call.

SELECT x, y
FROM (SELECT <required-columns> FROM t1 WHERE expensiveFunction(t1.key)) t0 INNER JOIN t2 ON t0.key = t2.key

To be extra sure you can persist this query (SELECT FROM t1 WHERE expensiveFunction(t1.key)) as a separate DataFrame. and then join table t2 with this DataFrame.

For example lets say we have DataFrames df1 and df2 for table t1 and t2 respectively. we do the something like the following to avoid the expensiveFunction call twice.

val df3 = df1.filter("col1 == 1") 
df3.persist() // forces evaluation of this dataframe and applies the expensive function filter on df1.
df3.createOrReplaceTempView("t1")
spark.sql("""SELECT t1.col1. t2.col2
FROM t1 INNER JOIN t2 ON t1.col2 = t2.col1""") // this query now have no reference to expensiveFunction