I am trying to run a query in hive:
here is the simplest setup (i know i can do an = but im using a custom UDF which does more than just an equality comparison)
datasets a and b are around 30,000 rows each
SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5
where custom_UDF_Equals_Comparison simply does an equality check between a.id = b.id
When i run this query, i can see in my log output that a lot of m/r tasks are running, assuming its comparing across both datasets until all possible permutations are compared, and far above the LIMIT of 5 (i would expect only a handful of m/r tasks as i know most of the data can join within the first few rows of each table), why would this occur? and/or how can i fix?
edit:
hi zero323, that is a similar question but not exact, it explains why a full comparison between 2 RDD's is performed when using UDF for comparison, but it doesnt explain why LIMIT doesn't stop the comparison when the limit of 5 is found. for example if 5 rows are found in the first 10 join attempts, why does it go for the remaining 30,000 * 30,000 attempts. Is it due to the fact that limit is applied after all the joins occur? e.g. it joins 30,000*30,000 rows and then reduces them to just 5?
edit2:
def levenshtein(str1: String, str2: String): Int = {
val lenStr1 = str1.length
val lenStr2 = str2.length
val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1)
for (i <- 0 to lenStr1) d(i)(0) = i
for (j <- 0 to lenStr2) d(0)(j) = j
for (i <- 1 to lenStr1; j <- 1 to lenStr2) {
val cost = if (str1(i - 1) == str2(j-1)) 0 else 1
d(i)(j) = min(
d(i-1)(j ) + 1, // deletion
d(i )(j-1) + 1, // insertion
d(i-1)(j-1) + cost // substitution
)
}
d(lenStr1)(lenStr2)
}
def min(nums: Int*): Int = nums.min
def join_views( joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = {
if (joinType == "Equals") {
if (col1 == null || col2 == null) {
return false
}
return col1 == col2
}
else if (joinType == "Fuzzy_String") {
if (col1 == null || col2 == null) {
return false
}
val val1 = col1.asInstanceOf[String]
val val2 = col2.asInstanceOf[String]
val ratio = Utils.distancePercentage(val1, val2)
if (ratio == 1.0) {
return val1 == val2
}
return (ratio >= parameters.asInstanceOf[Double])
}
return false;
}
... ON join_views("Fuzzy_String", "0.1", a.col1, b.col1) LIMIT 5 = 20secs
... ON join_views("Fuzzy_String", "0.9", a.col1, b.col1) LIMIT 5 = 100secs
o.a.s.sql.functions
provideslevenshtein
out of the box? – zero323