1
votes

I'm working in transform a JSON into a Data Frame. In the first step I create an Array of Data Frame and after that I make an Union. But I've a problem to do a Union in a JSON with Different Schemas.

I Can do it if the JSON have the same Schema like you can see in this other question: Parse JSON root in a column using Spark-Scala

I'm working with the following data:

val exampleJsonDifferentSchema = spark.createDataset(

      """
      {"ITEM1512":
            {"name":"Yin",
             "address":{"city":"Columbus",
                        "state":"Ohio"},
             "age":28           }, 
        "ITEM1518":
            {"name":"Yang",
             "address":{"city":"Working",
                        "state":"Marc"}
                        },
        "ITEM1458":
            {"name":"Yossup",
             "address":{"city":"Macoss",
                        "state":"Microsoft"},
            "age":28
                        }
      }""" :: Nil)

As you see the difference is that one Data Frame doesn't have Age.

val itemsExampleDiff = spark.read.json(exampleJsonDifferentSchema)
itemsExampleDiff.show(false)
itemsExampleDiff.printSchema

+---------------------------------+---------------------------+-----------------------+
|ITEM1458                         |ITEM1512                   |ITEM1518               |
+---------------------------------+---------------------------+-----------------------+
|[[Macoss, Microsoft], 28, Yossup]|[[Columbus, Ohio], 28, Yin]|[[Working, Marc], Yang]|
+---------------------------------+---------------------------+-----------------------+

root
 |-- ITEM1458: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1512: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1518: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)

My solution now is as the follow code where i make an array of DataFrame:

val columns:Array[String]       = itemsExample.columns
var arrayOfExampleDFs:Array[DataFrame] = Array()

for(col_name <- columns){

  val temp = itemsExample.select(lit(col_name).as("Item"), col(col_name).as("Value"))

  arrayOfExampleDFs = arrayOfExampleDFs :+ temp
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

But I've a JSON with Different Schemas when I reduce in a union I can't do it because the Data Frame need to have the same Schema. In fact, I've the following error:

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types.

I'm trying to do something similar I've found in this question: How to perform union on two DataFrames with different amounts of columns in spark?

Specifically that part:

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

But I cant make the set for the columns because I need to catch dynamically the columns both totals and singles. I only can do something like that:

for(i <- 0 until arrayOfExampleDFs.length-1) {

    val cols1 = arrayOfExampleDFs(i).select("Value").columns.toSet
    val cols2 = arrayOfExampleDFs(i+1).select("Value").columns.toSet
    val total = cols1 ++ cols2

    arrayOfExampleDFs(i).select("Value").printSchema()

    print(total)
}

So, how could be a function that do this union dynamically?

Update: expected output

In this Case This Data Frame and Schema:

+--------+---------------------------------+
|Item    |Value                            |
+--------+---------------------------------+
|ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
|ITEM1512|[[Columbus, Ohio], 28, Yin]      |
|ITEM1518|[[Working, Marc], null, Yang]    |
+--------+---------------------------------+

root
 |-- Item: string (nullable = false)
 |-- Value: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- age: long (nullable = true)
 |    |-- name: string (nullable = true)
1
If you know all possible columns you want to consider, you can simply create a list with those and use it. Then you won't need to compute a new set in each iteration. - Shaido
You may also read the JSON with a predefined schema with all expected columns and some columns can be null too. This works if you know the union of possible schema beforehand. - Sudev Ambadi
But Before read as a JSON I need to do a transformation because the Items (ITEM1458, ITEM1512, ITEM1518, etcetera) appears as a columns and I need to make this columns values. What is the problem I can solve here (for a json with a same schema): stackoverflow.com/questions/61669258/… - jqc
did you solve this one @jqc? - abiratsis
No, I can't solve it. @AlexandrosBiratsis - jqc

1 Answers

0
votes

Here is one possible solution which creates a common schema for all the dataframes by adding the age column when it is not found:

import org.apache.spark.sql.functions.{col, lit, struct}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

....

for(col_name <- columns){
  val currentDf = itemsExampleDiff.select(col(col_name))

  // try to identify if age field is present
  val hasAge = currentDf.schema.fields(0)
                        .dataType
                        .asInstanceOf[StructType]
                        .fields
                        .contains(StructField("age", LongType, true))

  val valueCol = hasAge match {
    // if not construct a new value column
    case false => struct(
                    col(s"${col_name}.address"), 
                    lit(null).cast("bigint").as("age"),
                    col(s"${col_name}.name")
                  )

    case true => col(col_name)
  }

  arrayOfExampleDFs = arrayOfExampleDFs :+ currentDf.select(lit(col_name).as("Item"), valueCol.as("Value"))
}

val jsonDF = arrayOfExampleDFs.reduce(_ union _)

// +--------+---------------------------------+
// |Item    |Value                            |
// +--------+---------------------------------+
// |ITEM1458|[[Macoss, Microsoft], 28, Yossup]|
// |ITEM1512|[[Columbus, Ohio], 28, Yin]      |
// |ITEM1518|[[Working, Marc],, Yang]         |
// +--------+---------------------------------+

Analysis: probably the most demanding part is finding out whether the age is present or not. For the look up we use df.schema.fields property which allow us to dig into the internal schema of each column.

When age is not found we regenerate the column by using a struct:

struct(
   col(s"${col_name}.address"), 
   lit(null).cast("bigint").as("age"),
   col(s"${col_name}.name")
)