1
votes

I want to be able to pass the join condition for two data frames as an input string. The idea is to make the join generic enough so that the user could pass on the condition they like.

Here's how I am doing it right now. Although it works, I think its not clean.

val testInput =Array("a=b", "c=d")
val condition: Column = testInput.map(x => testMethod(x)).reduce((a,b) => a.and(b))
firstDataFrame.join(secondDataFrame, condition, "fullouter")

Here's the testMethod

def testMethod(inputString: String): Column = {
  val splitted = inputString.split("=")
  col(splitted.apply(0)) === col(splitted.apply(1))
}

Need help in figuring out a better way of taking input to generate the join condition dynamically

1

1 Answers

4
votes

Not sure custom method like such would provide too much benefit, but if you must go down that path I would recommend making it cover also join on:

  1. columns of the same name (which is rather common)
  2. inequality condition

Sample code below:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

def joinDFs(dfL: DataFrame, dfR: DataFrame, conditions: List[String], joinType: String) = {
  val joinConditions = conditions.map( cond => {
      val arr = cond.split("\\s+")
      if (arr.size != 3) throw new Exception("Invalid join conditions!") else
        arr(1) match {
          case "<"  => dfL(arr(0)) <   dfR(arr(2))
          case "<=" => dfL(arr(0)) <=  dfR(arr(2))
          case "="  => dfL(arr(0)) === dfR(arr(2))
          case ">=" => dfL(arr(0)) >=  dfR(arr(2))
          case ">"  => dfL(arr(0)) >   dfR(arr(2))
          case "!=" => dfL(arr(0)) =!= dfR(arr(2))
          case _ => throw new Exception("Invalid join conditions!")
        }
    } ).
    reduce(_ and _)

  dfL.join(dfR, joinConditions, joinType)
}

val dfLeft = Seq(
  (1, "2018-04-01", "p"),
  (1, "2018-04-01", "q"),
  (2, "2018-05-01", "r")
).toDF("id", "date", "value")

val dfRight = Seq(
  (1, "2018-04-15", "x"),
  (2, "2018-04-15", "y")
).toDF("id", "date", "value")

val conditions = List("id = id", "date <= date")

joinDFs(dfLeft, dfRight, conditions, "left_outer").
  show
// +---+----------+-----+----+----------+-----+
// | id|      date|value|  id|      date|value|
// +---+----------+-----+----+----------+-----+
// |  1|2018-04-01|    p|   1|2018-04-15|    x|
// |  1|2018-04-01|    q|   1|2018-04-15|    x|
// |  2|2018-05-01|    r|null|      null| null|
// +---+----------+-----+----+----------+-----+