0
votes

I have two dataframes with the following columns..

DF1 - partitionNum, lowerBound, upperBound

DF2- ID, cumulativeCount

I want a resulting Frame which has - ID, partitionNum

I have done a cross join which is performing bad as below

DF2.crossJoin(DF1).where(col("cumulativeCount").between(col("lowerBound"), col("upperBound"))).orderBy("cumulativeCount") .select("ID", "partitionNum")

Since DF2 has 5 million of rows and DF1 has 50 rows, this cross join yields 250 million rows and this task is dying. How can i make this as a select where resulting frame should have ID from DF2 and partitionNum from DF1 and condition is select partition num from DF1 WHERE cumulative Count of DF2 is between lower and upperBound of DF1

I am looking for something like below will this work

sparkSession.sqlContext.sql("SELECT ID, cumulativeCount, A.partitionNum FROM CumulativeCountViewById WHERE cumulativeCount IN " + "(SELECT partitionNum FROM CumulativeRangeView WHERE cumulativeCount BETWEEN lowerBound and upperBound) AS A")

1

1 Answers

0
votes

Try this.

Solution is - you don't need to do crossjoin. Since your DF1 is only 50 rows, convert it to a map of key: partitionNum, value: Tuple2(lowerBound, UppperBound). Create an UDF which takes a number(your cumulativeCount) and checks against the map to return keys(ie., partitionNums) when lowerBound < cumulativeCount < upperBound.

You may edit the UDF to return only partitionNumbers and explode the "partNums" array column in the end if you choose to.

scala> DF1.show
+------------+----------+----------+
|partitionNum|lowerBound|upperBound|
+------------+----------+----------+
|           1|        10|        20|
|           2|         5|        10|
|           3|         6|        15|
|           4|         8|        20|
+------------+----------+----------+


scala> DF2.show
+---+---------------+
| ID|cumulativeCount|
+---+---------------+
|100|              5|
|100|             10|
|100|             15|
|100|             20|
|100|             25|
|100|             30|
|100|              6|
|100|             12|
|100|             18|
|100|             24|
|101|              1|
|101|              2|
|101|              3|
|101|              4|
|101|              5|
|101|              6|
|101|              7|
|101|              8|
|101|              9|
|101|             10|
+---+---------------+


scala> val smallData = DF1.collect.map(row => row.getInt(0) -> (row.getInt(1), row.getInt(2))).toMap
smallData: scala.collection.immutable.Map[Int,(Int, Int)] = Map(1 -> (10,20), 2 -> (5,10), 3 -> (6,15), 4 -> (8,20))

scala> val myUdf = udf((num:Int) => smallData.filter((v) => v._2._2 > num && num > v._2._1))
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(IntegerType,StructType(StructField(_1,IntegerType,false), StructField(_2,IntegerType,false)),true),Some(List(IntegerType)))

scala> DF2.withColumn("partNums", myUdf($"cumulativeCount")).show(false)
+---+---------------+-------------------------------------------+
|ID |cumulativeCount|partNums                                   |
+---+---------------+-------------------------------------------+
|100|5              |[]                                         |
|100|10             |[3 -> [6, 15], 4 -> [8, 20]]               |
|100|15             |[1 -> [10, 20], 4 -> [8, 20]]              |
|100|20             |[]                                         |
|100|25             |[]                                         |
|100|30             |[]                                         |
|100|6              |[2 -> [5, 10]]                             |
|100|12             |[1 -> [10, 20], 3 -> [6, 15], 4 -> [8, 20]]|
|100|18             |[1 -> [10, 20], 4 -> [8, 20]]              |
|100|24             |[]                                         |
|101|1              |[]                                         |
|101|2              |[]                                         |
|101|3              |[]                                         |
|101|4              |[]                                         |
|101|5              |[]                                         |
|101|6              |[2 -> [5, 10]]                             |
|101|7              |[2 -> [5, 10], 3 -> [6, 15]]               |
|101|8              |[2 -> [5, 10], 3 -> [6, 15]]               |
|101|9              |[2 -> [5, 10], 3 -> [6, 15], 4 -> [8, 20]] |
|101|10             |[3 -> [6, 15], 4 -> [8, 20]]               |
+---+---------------+-------------------------------------------+