Try this-
1. Load the input
val data =
"""
|emp_id|emp_site |emp_name
|1 |Washigton | Will
|2 |null | null
|3 |New York | Norman
|4 |Iowa | Ian
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df1.show(false)
df1.printSchema()
/**
* +------+---------+--------+
* |emp_id|emp_site |emp_name|
* +------+---------+--------+
* |1 |Washigton|Will |
* |2 |null |null |
* |3 |New York |Norman |
* |4 |Iowa |Ian |
* +------+---------+--------+
*
* root
* |-- emp_id: integer (nullable = true)
* |-- emp_site: string (nullable = true)
* |-- emp_name: string (nullable = true)
*/
val data1 =
"""
|emp_id|emp_site |emp_name
|1 |Washigton | Watson
|2 |Wisconsin | Sam
|3 |New York | null
|4 |Illinois | Ican
|5 |Pennsylvania | Patrick
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df2 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df2.show(false)
df2.printSchema()
/**
* +------+------------+--------+
* |emp_id|emp_site |emp_name|
* +------+------------+--------+
* |1 |Washigton |Watson |
* |2 |Wisconsin |Sam |
* |3 |New York |null |
* |4 |Illinois |Ican |
* |5 |Pennsylvania|Patrick |
* +------+------------+--------+
*
* root
* |-- emp_id: integer (nullable = true)
* |-- emp_site: string (nullable = true)
* |-- emp_name: string (nullable = true)
*/
2. Process to required format
val joiningKey = "emp_id"
val cols =
df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
.map { c =>
val (df1Col, df2Col) = df1.col(c._1) -> df2.col(c._2)
when(df1Col.isNull && df2Col.isNotNull,
array(map(lit("to"), df2Col), map(lit("change"), lit("insert"))))
.when(df1Col.isNotNull && df2Col.isNull,
array(map(lit("from"), df1Col), map(lit("change"), lit("delete"))))
.when(df1Col.isNotNull && df2Col.isNotNull && df1Col === df2Col,
lit(null))
.when(df1Col.isNull && df2Col.isNull,
lit(null))
.otherwise(array(map(lit("from"), df1Col), map(lit("to"), df2Col), map(lit("change"), lit("update"))))
.as(c._1)
}
df1.join(df2, Seq(joiningKey), "outer")
.select(cols ++ Seq(col(colName = joiningKey)): _*)
.orderBy(joiningKey)
.show(false)
/**
* +------------------------------------------------------+----------------------------------------------------+------+
* |emp_site |emp_name |emp_id|
* +------------------------------------------------------+----------------------------------------------------+------+
* |null |[[from -> Will], [to -> Watson], [change -> update]]|1 |
* |[[to -> Wisconsin], [change -> insert]] |[[to -> Sam], [change -> insert]] |2 |
* |null |[[from -> Norman], [change -> delete]] |3 |
* |[[from -> Iowa], [to -> Illinois], [change -> update]]|[[from -> Ian], [to -> Ican], [change -> update]] |4 |
* |[[to -> Pennsylvania], [change -> insert]] |[[to -> Patrick], [change -> insert]] |5 |
* +------------------------------------------------------+----------------------------------------------------+------+
*/
3. Use Struct if the datatype of input column is other than string (based on comments)
// in case column is not of type string
val getExpr = (fromExpr: String, toExpr: String, changeExpr: String) =>
s"named_struct('from', $fromExpr, 'to', $toExpr, 'change', '$changeExpr')"
val cols1 =
df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
.map { c =>
val (c1, c2) = s"df1.${c._1}" -> s"df2.${c._2}"
when(expr(s"$c1 is null and $c2 is not null"), expr(getExpr("null", c2, "insert")))
.when(expr(s"$c1 is not null and $c2 is null"), expr(getExpr(c1, "null", "delete")))
.when(expr(s"$c1 is not null and $c2 is not null and $c1=$c2"), expr(getExpr("null", "null", "null")))
.when(expr(s"$c1 is null and $c2 is null"), expr(getExpr("null", "null", "null")))
.otherwise(expr(getExpr(c1, c2, "update")))
.as(c._1)
}
val processedDF = df1.as("df1").join(df2.as("df2"), Seq(joiningKey), "outer")
.select(cols1 ++ Seq(col(colName = joiningKey)): _*)
.orderBy(joiningKey)
processedDF.show(false)
processedDF.printSchema()
/**
* +------------------------+----------------------+------+
* |emp_site |emp_name |emp_id|
* +------------------------+----------------------+------+
* |[,, null] |[Will, Watson, update]|1 |
* |[, Wisconsin, insert] |[, Sam, insert] |2 |
* |[,, null] |[Norman,, delete] |3 |
* |[Iowa, Illinois, update]|[Ian, Ican, update] |4 |
* |[, Pennsylvania, insert]|[, Patrick, insert] |5 |
* +------------------------+----------------------+------+
*
* root
* |-- emp_site: struct (nullable = false)
* | |-- from: string (nullable = true)
* | |-- to: string (nullable = true)
* | |-- change: string (nullable = false)
* |-- emp_name: struct (nullable = false)
* | |-- from: string (nullable = true)
* | |-- to: string (nullable = true)
* | |-- change: string (nullable = false)
* |-- emp_id: integer (nullable = true)
*/
Please note if the from
and to
are not present, I'm using change as null
, you can change that to something else like no-op