0
votes

Given two DataFramemes.

DF1 is :

|emp_id|emp_site             |emp_name  

|1     |Washigton            | [Will, Smith] 
|2     |null                 | null
|3     |New York             | [Norman, Smith]
|4     |Iowa                 | [Ian, Smith]

DF2 is :

|emp_id|emp_site             |emp_name  

|1     |Washigton            | [Watson, Smith] 
|2     |Wisconsin            | [Sam, Robinson]
|3     |New York             | null
|4     |Illinois             | [Ican, Robinson]
|5     |Pennsylvania         | [Patrick, Robinson]

Expected is a DF3 which is a join of DF1 and DF2 on the empty_id column(outer join such that all DF2 records are available). DF3 to have all the columns(except for emp_id) represented in the form of 'from', 'to' and 'change' ONLY IF the respective column values from DF1 and DF2 differ(NOTE: if column values from DF1 and DF2 are equal then should be null ).

note: While 'from' and 'to' are self explanatory containing values from DF1 and DF2, 'change' indicates 'insert' if the 'from' value is null. otherwise indicates 'update'

|emp_id|emp_site                         |emp_name

|1     |null                             |[from -> [Will, Smith], to -> [Watson, Smith], change->update]
|2     |[to->Wisconsin, change->insert]  |[to -> [Sam, Robinson], change->update]
|3     |null                             |[from -> [Norman, Smith], change->update]
|4     |[from ->Iowa, to -> Illinois, change->insert]    |[from ->[Ian, Smith], to -> [Ican, Robinson], change-> insert]
|5     |[to -> Pennsylvania, change->insert]         | [to -> [Patrick, Robinson], change->insert]

Had no success trying to achieve this desired DF3 using Map (esp. because the column types are not always Strings i.e., 'from' and 'to' can hold struct types depending on column types in df1 and df2). reason for trying with map data structure to build 'from', 'to' and 'change' in DF3 is because eventually this DF3 needs to be translated to a json.

Any help is much appreciated.

1
I've seen similar query in the past.Som
still need help on this as the suggestions have limitations with using maps.ark
can you please check my edited answer?Som

1 Answers

0
votes

Try this-

1. Load the input

  val data =
      """
        |emp_id|emp_site             |emp_name
        |1     |Washigton            | Will
        |2     |null                 | null
        |3     |New York             | Norman
        |4     |Iowa                 | Ian
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)
    df1.show(false)
    df1.printSchema()
    /**
      * +------+---------+--------+
      * |emp_id|emp_site |emp_name|
      * +------+---------+--------+
      * |1     |Washigton|Will    |
      * |2     |null     |null    |
      * |3     |New York |Norman  |
      * |4     |Iowa     |Ian     |
      * +------+---------+--------+
      *
      * root
      * |-- emp_id: integer (nullable = true)
      * |-- emp_site: string (nullable = true)
      * |-- emp_name: string (nullable = true)
      */


    val data1 =
      """
        |emp_id|emp_site             |emp_name
        |1     |Washigton            | Watson
        |2     |Wisconsin            | Sam
        |3     |New York             | null
        |4     |Illinois             | Ican
        |5     |Pennsylvania         | Patrick
      """.stripMargin
    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df2.show(false)
    df2.printSchema()
    /**
      * +------+------------+--------+
      * |emp_id|emp_site    |emp_name|
      * +------+------------+--------+
      * |1     |Washigton   |Watson  |
      * |2     |Wisconsin   |Sam     |
      * |3     |New York    |null    |
      * |4     |Illinois    |Ican    |
      * |5     |Pennsylvania|Patrick |
      * +------+------------+--------+
      *
      * root
      * |-- emp_id: integer (nullable = true)
      * |-- emp_site: string (nullable = true)
      * |-- emp_name: string (nullable = true)
      */

2. Process to required format

 val joiningKey = "emp_id"
    val cols =
      df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
        .map { c  =>
            val (df1Col, df2Col) = df1.col(c._1) -> df2.col(c._2)
            when(df1Col.isNull && df2Col.isNotNull,
              array(map(lit("to"), df2Col), map(lit("change"), lit("insert"))))
            .when(df1Col.isNotNull && df2Col.isNull,
              array(map(lit("from"), df1Col), map(lit("change"), lit("delete"))))
              .when(df1Col.isNotNull && df2Col.isNotNull && df1Col === df2Col,
                lit(null))
              .when(df1Col.isNull && df2Col.isNull,
                lit(null))
              .otherwise(array(map(lit("from"), df1Col), map(lit("to"), df2Col), map(lit("change"), lit("update"))))
            .as(c._1)

          }

    df1.join(df2, Seq(joiningKey), "outer")
      .select(cols ++ Seq(col(colName = joiningKey)): _*)
      .orderBy(joiningKey)
      .show(false)

    /**
      * +------------------------------------------------------+----------------------------------------------------+------+
      * |emp_site                                              |emp_name                                            |emp_id|
      * +------------------------------------------------------+----------------------------------------------------+------+
      * |null                                                  |[[from -> Will], [to -> Watson], [change -> update]]|1     |
      * |[[to -> Wisconsin], [change -> insert]]               |[[to -> Sam], [change -> insert]]                   |2     |
      * |null                                                  |[[from -> Norman], [change -> delete]]              |3     |
      * |[[from -> Iowa], [to -> Illinois], [change -> update]]|[[from -> Ian], [to -> Ican], [change -> update]]   |4     |
      * |[[to -> Pennsylvania], [change -> insert]]            |[[to -> Patrick], [change -> insert]]               |5     |
      * +------------------------------------------------------+----------------------------------------------------+------+
      */

3. Use Struct if the datatype of input column is other than string (based on comments)

  // in case column is not of type string
    val getExpr = (fromExpr: String, toExpr: String, changeExpr: String) =>
      s"named_struct('from', $fromExpr, 'to', $toExpr, 'change', '$changeExpr')"
    val cols1 =
      df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
        .map { c =>
          val (c1, c2) = s"df1.${c._1}" -> s"df2.${c._2}"
          when(expr(s"$c1 is null and $c2 is not null"), expr(getExpr("null", c2, "insert")))
            .when(expr(s"$c1 is not null and $c2 is null"), expr(getExpr(c1, "null", "delete")))
            .when(expr(s"$c1 is not null and $c2 is not null and $c1=$c2"), expr(getExpr("null", "null", "null")))
            .when(expr(s"$c1 is null and $c2 is null"), expr(getExpr("null", "null", "null")))
            .otherwise(expr(getExpr(c1, c2, "update")))
            .as(c._1)
        }

    val processedDF = df1.as("df1").join(df2.as("df2"), Seq(joiningKey), "outer")
      .select(cols1 ++ Seq(col(colName = joiningKey)): _*)
      .orderBy(joiningKey)
    processedDF.show(false)
    processedDF.printSchema()

    /**
      * +------------------------+----------------------+------+
      * |emp_site                |emp_name              |emp_id|
      * +------------------------+----------------------+------+
      * |[,, null]               |[Will, Watson, update]|1     |
      * |[, Wisconsin, insert]   |[, Sam, insert]       |2     |
      * |[,, null]               |[Norman,, delete]     |3     |
      * |[Iowa, Illinois, update]|[Ian, Ican, update]   |4     |
      * |[, Pennsylvania, insert]|[, Patrick, insert]   |5     |
      * +------------------------+----------------------+------+
      *
      * root
      * |-- emp_site: struct (nullable = false)
      * |    |-- from: string (nullable = true)
      * |    |-- to: string (nullable = true)
      * |    |-- change: string (nullable = false)
      * |-- emp_name: struct (nullable = false)
      * |    |-- from: string (nullable = true)
      * |    |-- to: string (nullable = true)
      * |    |-- change: string (nullable = false)
      * |-- emp_id: integer (nullable = true)
      */

Please note if the from and to are not present, I'm using change as null, you can change that to something else like no-op