2
votes

I am trying to run a query in hive:

here is the simplest setup (i know i can do an = but im using a custom UDF which does more than just an equality comparison)

datasets a and b are around 30,000 rows each

SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5

where custom_UDF_Equals_Comparison simply does an equality check between a.id = b.id

When i run this query, i can see in my log output that a lot of m/r tasks are running, assuming its comparing across both datasets until all possible permutations are compared, and far above the LIMIT of 5 (i would expect only a handful of m/r tasks as i know most of the data can join within the first few rows of each table), why would this occur? and/or how can i fix?

edit:

hi zero323, that is a similar question but not exact, it explains why a full comparison between 2 RDD's is performed when using UDF for comparison, but it doesnt explain why LIMIT doesn't stop the comparison when the limit of 5 is found. for example if 5 rows are found in the first 10 join attempts, why does it go for the remaining 30,000 * 30,000 attempts. Is it due to the fact that limit is applied after all the joins occur? e.g. it joins 30,000*30,000 rows and then reduces them to just 5?

edit2:

  def levenshtein(str1: String, str2: String): Int = {
val lenStr1 = str1.length
val lenStr2 = str2.length

val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1)

for (i <- 0 to lenStr1) d(i)(0) = i
for (j <- 0 to lenStr2) d(0)(j) = j

for (i <- 1 to lenStr1; j <- 1 to lenStr2) {
  val cost = if (str1(i - 1) == str2(j-1)) 0 else 1

  d(i)(j) = min(
    d(i-1)(j  ) + 1,     // deletion
    d(i  )(j-1) + 1,     // insertion
    d(i-1)(j-1) + cost   // substitution
  )
}

d(lenStr1)(lenStr2)

}

def min(nums: Int*): Int = nums.min

def join_views( joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = {
if (joinType == "Equals") {
  if (col1 == null || col2 == null) {
    return false
  }

  return col1 == col2
}
else if (joinType == "Fuzzy_String") {
  if (col1 == null || col2 == null) {
    return false
  }

  val val1 = col1.asInstanceOf[String]
  val val2 = col2.asInstanceOf[String]

  val ratio = Utils.distancePercentage(val1, val2)

  if (ratio == 1.0) {
    return val1 == val2
  }

  return (ratio >= parameters.asInstanceOf[Double])
}

return false;

}

... ON join_views("Fuzzy_String", "0.1", a.col1, b.col1) LIMIT 5 = 20secs

... ON join_views("Fuzzy_String", "0.9", a.col1, b.col1) LIMIT 5 = 100secs

1
it can stay closed, thanks for the helpandrew.butkus
I actually discovered something perplexing, my custom_UDF also does a fuzzy check, and when i run that with a fuzzy value of 0.1 must match, it joins very quickly and returns the result (e.g. it matches onto 5 rows very fast and returns), when i set it to 0.9 must match it takes similar to the original = comparison in UDF. I wonder why the fuzzy match of 0.1 DOES return faster? not necessarily a question, just an observationandrew.butkus
This actually interesting. Can you share some details of implementation?zero323
I have attached some snippets, please ignore bad practices :)andrew.butkus
Sure :) Are you aware that o.a.s.sql.functions provides levenshtein out of the box?zero323

1 Answers

1
votes

So there are three different problems here:

  • Spark optimizes joins by using hashing and sorting so these optimizations are applicable only to equi-joins. Other types of joins, including ones depending on UDFs require pairwise comparisons hence Cartesian product. You can check Why using a UDF in a SQL query leads to cartesian product? for details.
  • Limit operations after data movements, especially shuffles, cannot be fully optimized. You can find a nice explanation in the nice answer to Towards limiting the big RDD provided by Sun Rui.

    Your case is paradoxically simpler due to lack of shuffle but you still have to bring partitions together.

  • Limit optimizations are based on partitions, not records. Spark starts which checking the first partition and, if number of elements satisfying criteria is lower than required, it iterates increasing number of partitions to take with each iteration (as far as I remember the factor is 4). If you're looking for a rare event this can increase pretty fast.