1
votes

I've my "Structured data" as shown below, I need to transform it to the below shown "Expected results" type. My "Output schema" is shown as well. Appreciate if you can provide some help on how I can achieve this using Spark Scala code.

Note: Grouping on the structured data to be done the columns SN and VIN. There should be one row for the same SN and VIN, if either SN or VIN changes, then data to be present in the next row.

Structured data:

+-----------------+-------------+--------------------+---+
|VIN              |ST           |SV                  |SN |
|FU74HZ501740XXXXX|1566799999225|44.0                |APP|
|FU74HZ501740XXXXX|1566800002758|61.0                |APP|
|FU74HZ501740XXXXX|1566800009446|23.39               |ASP|

Expected results:

enter image description here

Output schema:

val outputSchema = StructType(
  List(
    StructField("VIN", StringType, true),
    StructField("EVENTS", ArrayType(
        StructType(Array(
          StructField("SN", StringType, true),
          StructField("ST", IntegerType, true),
          StructField("SV", DoubleType, true)
        ))))
  )
)
2
Please add text instead of images in your question. It makes it easier for us to reproduce the problem based on your current dataset.philantrovert
Which column are you grouping by here? SN?Shaido
Hello @Shaido, yes, grouping should be done on column SN... i want to loop through the SN column and include SN, ST, SV within the single array of column EVENTS, and VIN as another column as shown in the expected results.Anil Kumar
@AnilKumarKB: What would happen if SN have different valeus for the same VIN? For example, in your example if the VIN in second row is different from the first row.Shaido
Hello @Shaido sorry for the confusion, it should be grouped by SN and VIN. ex: one row for the same SN and VIN. If either SN or VIN changes, then data to be present in the next row.Anil Kumar

2 Answers

3
votes

From Spark 2.1 you can achieve this using struct and collect_list.

val df_2 = Seq(
  ("FU74HZ501740XXXX",1566799999225.0,44.0,"APP"),
  ("FU74HZ501740XXXX",1566800002758.0,61.0,"APP"),
  ("FU74HZ501740XXXX",1566800009446.0,23.39,"ASP")
).toDF("vin","st","sv","sn") 

df_2.show(false)
+----------------+-----------------+-----+---+
|vin             |st               |sv   |sn |
+----------------+-----------------+-----+---+
|FU74HZ501740XXXX|1.566799999225E12|44.0 |APP|
|FU74HZ501740XXXX|1.566800002758E12|61.0 |APP|
|FU74HZ501740XXXX|1.566800009446E12|23.39|ASP|
+----------------+-----------------+-----+---+

Use collect_list with struct:

df_2.groupBy("vin","sn")
  .agg(collect_list(struct($"st", $"sv",$"sn")).as("events"))
  .withColumn("events",to_json($"events"))
  .drop(col("sn"))

This will give the wwanted output:

+----------------+---------------------------------------------------------------------------------------------+
|vin             |events                                                                                       |
+----------------+---------------------------------------------------------------------------------------------+
|FU74HZ501740XXXX|[{"st":1.566800009446E12,"sv":23.39,"sn":"ASP"}]                                             |
|FU74HZ501740XXXX|[{"st":1.566799999225E12,"sv":44.0,"sn":"APP"},{"st":1.566800002758E12,"sv":61.0,"sn":"APP"}]|
+----------------+---------------------------------------------------------------------------------------------+
1
votes

You can get it via SparkSession.


val df = spark.read.json("/path/to/json/file/test.json")

here spark is the SparkSession object