Not sure custom method like such would provide too much benefit, but if you must go down that path I would recommend making it cover also join
on:
- columns of the same name (which is rather common)
- inequality condition
Sample code below:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
def joinDFs(dfL: DataFrame, dfR: DataFrame, conditions: List[String], joinType: String) = {
val joinConditions = conditions.map( cond => {
val arr = cond.split("\\s+")
if (arr.size != 3) throw new Exception("Invalid join conditions!") else
arr(1) match {
case "<" => dfL(arr(0)) < dfR(arr(2))
case "<=" => dfL(arr(0)) <= dfR(arr(2))
case "=" => dfL(arr(0)) === dfR(arr(2))
case ">=" => dfL(arr(0)) >= dfR(arr(2))
case ">" => dfL(arr(0)) > dfR(arr(2))
case "!=" => dfL(arr(0)) =!= dfR(arr(2))
case _ => throw new Exception("Invalid join conditions!")
}
} ).
reduce(_ and _)
dfL.join(dfR, joinConditions, joinType)
}
val dfLeft = Seq(
(1, "2018-04-01", "p"),
(1, "2018-04-01", "q"),
(2, "2018-05-01", "r")
).toDF("id", "date", "value")
val dfRight = Seq(
(1, "2018-04-15", "x"),
(2, "2018-04-15", "y")
).toDF("id", "date", "value")
val conditions = List("id = id", "date <= date")
joinDFs(dfLeft, dfRight, conditions, "left_outer").
show
// +---+----------+-----+----+----------+-----+
// | id| date|value| id| date|value|
// +---+----------+-----+----+----------+-----+
// | 1|2018-04-01| p| 1|2018-04-15| x|
// | 1|2018-04-01| q| 1|2018-04-15| x|
// | 2|2018-05-01| r|null| null| null|
// +---+----------+-----+----+----------+-----+