0
votes

I have a following spark dataframe where all the columns (except for primary key column emp_id) consist of a map (with keys 'from' and 'to' which can have null values). I want to evaluate 'from' and 'to' of each column(except emp_id) and add a new key to the map(named 'change') which has a value of a) 'insert' if 'from' value is null and 'to' is not null b) 'delete' if 'to' value is null and 'from' is not null b) 'update' if 'from' and 'to' are not null & 'from' value is different from 'to' value

Note: columns which have null value will remain untouched.

Important Note: The type of these columns is not Map[String, String] but instead something lie Map[String, Any] meaning the value can be of other struct objects

How can we achieve this in Scala.

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson]|null                 |[from -> 1000, to -> 8000]|[from ->, to -> Seattle]          |
|3     |null                 |[from -> Norman, to -> Nate]|null                 |[from -> 1000, to -> 8000]|[from -> CherryHill, to -> Newark]|
|4     |[from ->, to -> Iowa]|[from ->, to -> Ian]        |[from ->, to -> 1004]|[from ->, to -> 8000]     |[from ->, to -> Des Moines]       |

Expected:

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from ->, to -> Seattle, change -> insert]          |
|3     |null                 |[from -> Norman, to -> Nate, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from -> CherryHill, to -> Newark, change -> update]|
|4     |[from ->, to -> Iowa, change -> insert]|[from ->, to -> Ian, change -> insert]        |[from ->, to -> 1004, change -> insert]|[from ->, to -> 8000, change -> insert]     |[from ->, to -> Des Moines, change -> insert]       |
1
it is NOT a duplicate. in fact its an extension of that the question from the link you posted.ark
solving the below aspect would be the main difference from the question in above link : Important Note: The type of these columns is not Map[String, String] but instead something lie Map[String, Any] meaning the value can be of other struct objectsark
Columns cannot have Any / Map[String, Any] type in Dataframe. Please check again.QuickSilver

1 Answers

0
votes

You can do this via Row mapper function as below, Please find the code explanation inline

import org.apache.spark.sql.Row
object MapUpdater {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Load your data
    val df = List(
      (1,null,Map("from" ->"Will","to"-> "Watson"),null,Map("from" ->"1000","to"-> "8000"),Map("from" ->null,"to"-> "Seattle")),
      (2,null,Map("from" ->"Norman","to"-> "Nate"),null,Map("from" ->"1000","to"-> "8000"),Map("from" ->"CherryHill","to"-> "Newark")),
      (3,Map("from" ->null,"to"-> "Iowa"),Map("from" ->null,"to"-> "Ian"),Map("from" ->null,"to"-> "1004"),Map("from" ->"1000","to"-> "8000"),Map("from" ->null,"to"-> "Des Moines"))
    ).toDF("emp_id","emp_city","emp_name","emp_phone","emp_sal","emp_site")


    //Map each of your row
    df.map(row => {

      val new_emp_city = mapUpdater(row,1)
      val new_emp_name = mapUpdater(row,2)
      val new_emp_phone = mapUpdater(row,3)
      val new_emp_sal = mapUpdater(row,4)
      val new_emp_site = mapUpdater(row,5)

      (row.getInt(0),new_emp_city,new_emp_name,new_emp_phone,new_emp_sal,new_emp_site)

    }).toDF("emp_id","emp_city","emp_name","emp_phone","emp_sal","emp_site")
      .show(false)

  }

  //Row mapper function
  private def mapUpdater(row: Row,colId:Int): Map[String, String] = {
    val old_map = row.getAs[Map[String, String]](colId)

    val new_map: Map[String, String] = if (null != old_map) {
      if (null == old_map.getOrElse("from", null) && null != old_map.getOrElse("to", null)) {
        old_map + ("change" -> "Insert")
      } else if (null != old_map.getOrElse("from", null) && null == old_map.getOrElse("to", null)) {
        old_map + ("change" -> "Delete")
      } else old_map

    } else old_map
    (new_map)
  }
}