1
votes

I have a column in a DataFrame that contains a nested json in string format

val df=Seq(("""{"-1":{"-1":[ 7420,0,20,22,0,0]}}""" ), ("""{"-1":{"-1":[1006,2,18,10,0,0]}}"""), ("""{"-1":{"-1":[6414,0,17,11,0,0]}}""")).toDF("column1")


+-------------------------------------+
|                              column1|           
+-------------------------------------+
|{"-1":{"-1":[7420, 0, 20, 22, 0, 0]}}|
|{"-1":{"-1":[1006, 2, 18, 10, 0, 0]}}|
|{"-1":{"-1":[6414, 0, 17, 11, 0, 0]}}|
+-----------------------+-------------+

I want to get a data frame that looks like this

+----+----+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|col7|col8|
+----+----+----+----+----+----+----+----+
|  -1|  -1|7420|   0|  20|  22|   0|   0|
|  -1|  -1|1006|   2|  18|  10|   0|   0|
|  -1|  -1|6414|   0|  17|  11|   0|   0|
+----+----+----+----+----+----+----+----+

I first applied get_json_object which gave me

val df1= df.select(get_json_object($"column1", "$.-1")

+------------------------------+
|                       column1|           
+------------------------------+
|{"-1":[7420, 0, 20, 22, 0, 0]}|
|{"-1":[1006, 2, 18, 10, 0, 0]}|
|{"-1":[6414, 0, 17, 11, 0, 0]}|
+-----------------------+------+

so I lost the first element.

I tried to convert the existing elements to the format I wanted with this

val schema = new StructType()                              
.add("-1",                                         
MapType(                                             
  StringType,
  new StructType()
  .add("a1", StringType)
  .add("a2", StringType)
  .add("a3", StringType)
  .add("a4", StringType)
  .add("a5", StringType)
  .add("a6", StringType)
  .add("a7", StringType)
  .add("a8", StringType)
  .add("a9", StringType)
  .add("a10", StringType)
  .add("a11", StringType)
  .add("a11", StringType)))

df1.select(from_json($"new2", schema ))

but it returned a 1 column DataFrame of all nulls

2

2 Answers

0
votes

You can simply use from_json inbuilt function to convert the json string to actual json object with schema defined as StructType(Seq(StructField("-1", StructType(Seq(StructField("-1", ArrayType(IntegerType)))))))

import org.apache.spark.sql.functions._
val jsonedDF = df.select(from_json(col("column1"), StructType(Seq(StructField("-1", StructType(Seq(StructField("-1", ArrayType(IntegerType)))))))).as("json"))
jsonedDF.show(false)
//    +---------------------------------------+
//    |json                                   |
//    +---------------------------------------+
//    |[[WrappedArray(7420, 0, 20, 22, 0, 0)]]|
//    |[[WrappedArray(1006, 2, 18, 10, 0, 0)]]|
//    |[[WrappedArray(6414, 0, 17, 11, 0, 0)]]|
//    +---------------------------------------+
jsonedDF.printSchema()
//    root
//    |-- json: struct (nullable = true)
//    |    |-- -1: struct (nullable = true)
//    |    |    |-- -1: array (nullable = true)
//    |    |    |    |-- element: integer (containsNull = true)

After that just select the appropriate columns and use aliasing to give the column appropriate names

jsonedDF.select(
  lit("-1").as("col1"),
  lit("-1").as("col2"),
  col("json.-1.-1")(0).as("col3"),
  col("json.-1.-1")(1).as("col4"),
  col("json.-1.-1")(2).as("col5"),
  col("json.-1.-1")(3).as("col6"),
  col("json.-1.-1")(4).as("col7"),
  col("json.-1.-1")(5).as("col8")
).show(false)

which should give you your final dataframe

+----+----+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|col7|col8|
+----+----+----+----+----+----+----+----+
|-1  |-1  |7420|0   |20  |22  |0   |0   |
|-1  |-1  |1006|2   |18  |10  |0   |0   |
|-1  |-1  |6414|0   |17  |11  |0   |0   |
+----+----+----+----+----+----+----+----+

I have used -1 as literals as they are key names in json string and would always be same.

0
votes

The JSON data that you provided doesn't seem valid

You can change to rdd of strings and replace all the "[]{}: with empty and : with , so that creates a comma separated string and convert it back to the dataframe as below

  //data as you provided 
  val df = Seq(
    ("""{"-1":{"-1":[ 7420,0,20,22,0,0]}}"""),
    ("""{"-1":{"-1":[1006,2,18,10,0,0]}}"""),
    ("""{"-1":{"-1":[6414,0,17,11,0,0]}}""")
  ).toDF("column1")

  //create a schema 
  val schema = new StructType()
    .add("col1", StringType)
    .add("col2", StringType)
    .add("col3", StringType)
    .add("col4", StringType)
    .add("col5", StringType)
    .add("col6", StringType)
    .add("col7", StringType)
    .add("col8", StringType)
    /*.add("a9", StringType)
    .add("a10", StringType)
    .add("a11", StringType)
    .add("a11", StringType)*/

  //convert to rdd and replace using regex 
  val df2 = df.rdd.map(_.getString(0))
    .map(_.replaceAll("[\"|\\[|\\]|{|}]", "").replace(":", ","))
    .map(_.split(","))
    .map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
    .toDF(schema.fieldNames :_*)

OR

val rdd = df.rdd.map(_.getString(0))
    .map(_.replaceAll("[\"|\\[|\\]|{|}]", "").replace(":", ","))
    .map(_.split(","))
    .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))

  val finalDF = spark.sqlContext.createDataFrame(rdd, schema)

  df2.show()
  //or 
  finalDF.show()
  //will have a same output

Output:

+----+----+-----+----+----+----+----+----+
|col1|col2|col3 |col4|col5|col6|col7|col8|
+----+----+-----+----+----+----+----+----+
|-1  |-1  | 7420|0   |20  |22  |0   |0   |
|-1  |-1  |1006 |2   |18  |10  |0   |0   |
|-1  |-1  |6414 |0   |17  |11  |0   |0   |
+----+----+-----+----+----+----+----+----+

Hope this helps!