1
votes

I have two spark datasets, one with columns accountid and key, the key column in the format of an array [key1,key2,key3..] and another dataset with two columns accountid and key values which is in json. accountid , {key:value, key,value...}. I need to update the value in the second dataset if key appear for accountid in first dataset.

   import org.apache.spark.sql.functions._
val df= sc.parallelize(Seq(("20180610114049", "id1","key1"),
  ("20180610114049", "id2","key2"),
  ("20180610114049", "id1","key1"),
  ("20180612114049", "id2","key1"),
  ("20180613114049", "id3","key2"),
  ("20180613114049", "id3","key3")
 )).toDF("date","accountid", "key")
val gp=df.groupBy("accountid","date").agg(collect_list("key"))

    +---------+--------------+-----------------+
|accountid|          date|collect_list(key)|
+---------+--------------+-----------------+
|      id2|20180610114049|           [key2]|
|      id1|20180610114049|     [key1, key1]|
|      id3|20180613114049|     [key2, key3]|
|      id2|20180612114049|           [key1]|
+---------+--------------+-----------------+


val df2= sc.parallelize(Seq(("20180610114049", "id1","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180610114049", "id2","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180611114049", "id1","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180612114049", "id2","{'key1':'0.0','key2':'0.0','key3':'0.0'}"),
  ("20180613114049", "id3","{'key1':'0.0','key2':'0.0','key3':'0.0'}")
 )).toDF("date","accountid", "result")

+--------------+---------+----------------------------------------+
|date          |accountid|result                                  |
+--------------+---------+----------------------------------------+
|20180610114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180610114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180612114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180613114049|id3      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
+--------------+---------+----------------------------------------+

expected output

+--------------+---------+----------------------------------------+
|date          |accountid|result                                  |
+--------------+---------+----------------------------------------+
|20180610114049|id1      |{'key1':'1.0','key2':'0.0','key3':'0.0'}|
|20180610114049|id2      |{'key1':'0.0','key2':'1.0','key3':'0.0'}|
|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|
|20180612114049|id2      |{'key1':'1.0','key2':'0.0','key3':'0.0'}|
|20180613114049|id3      |{'key1':'0.0','key2':'1.0','key3':'1.0'}|
+--------------+---------+----------------------------------------+
2

2 Answers

1
votes

You will most definitely need a UDF to do it cleanly here.

You can pass both the array and the JSON to the UDF after joining on date and accountid, parse the JSON inside the UDF using the parser of your choice (I'm using JSON4S in the example), check if the key exists in the array and then change the value, convert it to JSON again and return it from the UDF.

val gp=df.groupBy("accountid","date").agg(collect_list("key").as("key"))

val joined = df2.join(gp, Seq("date", "accountid") , "left_outer")

joined.show(false)
//+--------------+---------+----------------------------------------+------------+
//|date          |accountid|result                                  |key         |
//+--------------+---------+----------------------------------------+------------+
//|20180610114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key2]      |
//|20180613114049|id3      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key2, key3]|
//|20180610114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key1, key1]|
//|20180611114049|id1      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|null        |
//|20180612114049|id2      |{'key1':'0.0','key2':'0.0','key3':'0.0'}|[key1]      |
//+--------------+---------+----------------------------------------+------------+

// the UDF that will do the most work
// it's important to declare `formats` inside the function
// to avoid object not Serializable exception
// Not all cases are covered, use with caution :D
val convertJsonValues = udf{(json: String, arr: Seq[String]) =>
    import org.json4s.jackson.JsonMethods._
    import org.json4s.JsonDSL._
    implicit val format = org.json4s.DefaultFormats
    // replace single quotes with double
    val kvMap = parse(json.replaceAll("'", """"""")).values.asInstanceOf[Map[String,String]]
    val updatedKV = kvMap.map{ case(k,v) => if(arr.contains(k)) (k,"1.0") else (k,v) }
    compact(render(updatedKV))
}

// Use when-otherwise and send empty array where `key` is null
joined.select($"date", 
              $"accountid",
              when($"key".isNull, convertJsonValues($"result", array()))
               .otherwise(convertJsonValues($"result", $"key"))
               .as("result")
              ).show(false)

//+--------------+---------+----------------------------------------+
//|date          |accountid|result                                  |
//+--------------+---------+----------------------------------------+
//|20180610114049|id2      |{"key1":"0.0","key2":"1.0","key3":"0.0"}|
//|20180613114049|id3      |{"key1":"0.0","key2":"1.0","key3":"1.0"}|
//|20180610114049|id1      |{"key1":"1.0","key2":"0.0","key3":"0.0"}|
//|20180611114049|id1      |{"key1":"0.0","key2":"0.0","key3":"0.0"}|
//|20180612114049|id2      |{"key1":"1.0","key2":"0.0","key3":"0.0"}|
//+--------------+---------+----------------------------------------+
1
votes

You can achieve your requirement with the use of udf function after you join both dataframes. Of course there are stuffs like converting json to struct, struct to json again, case class usage and more (comments are provided for further explanation)

import org.apache.spark.sql.functions._

//aliasing the collected key
val gp = df.groupBy("accountid","date").agg(collect_list("key").as("keys"))

//schema for converting json to struct
val schema = StructType(Seq(StructField("key1", StringType, true), StructField("key2", StringType, true), StructField("key3", StringType, true)))

//udf function to update the values of struct where result is a case class
def updateKeysUdf = udf((arr: Seq[String], json: Row) => Seq(json.schema.fieldNames.map(key => if(arr.contains(key)) "1.0" else json.getAs[String](key))).collect{case Array(a,b,c) => result(a,b,c)}.toList(0))

//changing json string to stuct using the above schema
df2.withColumn("result", from_json(col("result"), schema))
  .as("df2")   //aliasing df2 for joining and selecting
    .join(gp.as("gp"), col("df2.accountid") === col("gp.accountid"), "left")   //aliasing gp dataframe and joining with accountid
    .select(col("df2.accountid"), col("df2.date"), to_json(updateKeysUdf(col("gp.keys"), col("df2.result"))).as("result"))  //selecting and calling above udf function and finally converting to json stirng
  .show(false)

where result is a case class

case class result(key1: String, key2: String, key3: String)

which should give you

+---------+--------------+----------------------------------------+
|accountid|date          |result                                  |
+---------+--------------+----------------------------------------+
|id3      |20180613114049|{"key1":"0.0","key2":"1.0","key3":"1.0"}|
|id1      |20180610114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
|id1      |20180611114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
|id2      |20180610114049|{"key1":"0.0","key2":"1.0","key3":"0.0"}|
|id2      |20180610114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
|id2      |20180612114049|{"key1":"0.0","key2":"1.0","key3":"0.0"}|
|id2      |20180612114049|{"key1":"1.0","key2":"0.0","key3":"0.0"}|
+---------+--------------+----------------------------------------+

I hope the answer is helpful