3
votes

I'm quite new with Apache Spark and still struggling with it sometimes. I'm trying to import a quite complex json file and flatten it before saving it in a parquet file.

My json file is a tree of stores.

{
"id": "store02",
"name": "store name",
"domain": "domain",
"currency": "EUR",
"address1": "Somewhere",
"country": "GER",
"city": "Berlin",
"zipCode": "12345",
"timeZone": "CET",
"accounts" : [
    {
        "field1": "",
        "filed2": "",
        "field3": "",
        "optionnalArray1": [
            {
                "field1": "",
                "field2": ""
            }
        ],
        "optionnalArray2": ["aa", "bb"]
    }
],
"stores": [ .... ]    
}

Each store can have a field wich is an array of accounts. An account has 3 mandatory fields and two optionnals. So I have a dataframe with a field that can have 3 differents types.

Importing the file in a dataframe is no big deal but during the flatten process I may want to do an union on two dataframes with accounts that may have a different schema and of course I have the following error : "Union can only be performed on tables with the compatible column types"

Is there a way to do this easely ? How can spark import such json file without problem ?

@Ramesh

Let's say I have two dataframes. The first one is a dataframe of stores without accounts. The second one is a dataframe of stores with accounts. An account is a struct defined like that :

val acquirerStruct = StructType(
    StructField("merchantId", StringType, nullable = true) ::
    StructField("name", StringType, nullable = true) ::
    Nil)

val accountStruct = StructType(
    StructField("acquirers", ArrayType(acquirerStruct), nullable = true) ::
        StructField("applicationCode", StringType, nullable = true) ::
        StructField("channelType", StringType, nullable = true) ::
        StructField("id", StringType, nullable = true) ::
        StructField("terminals", ArrayType(StringType), nullable = true) ::
        Nil)

When I want to union the two dataframes I create a column account to my first dataframe before :

df1.withColumn("account", array(lit(null).cast(accountStruct))).union(df2)

If, in df2, all rows have an account that has the same structure that accountStruct it works fine. But that is not always true. An account may have no terminals or acquirers. That's perfectly valid in json. In that case I have the error mentionned before.

"Union can only be performed on tables with the compatible column types"
1
can you share what you've tried so far. ??Ramesh Maharjan
you should add the following answer in the question itself. Please do so :)Ramesh Maharjan
ok. done. I didn't know it works this way.user9349304
can't you write an if else statement to check for account column in df2?Ramesh Maharjan
Yes I could but I would like to avoid that and have a code as simplest as possible. In fact I already have a workaround that is pretty much the same but I'm quite frustated to not be able to do such thing in one line of code and I don't understand why Spark seems perfectly able to deal with columns of such type and don't allows me to do the same.user9349304

1 Answers

0
votes

I had the same issue in PySpark I solved it by providing schema when reading the incompatible dataframe

import copy
...
schema_to_read = copy.deepcopy(df1.schema)
df2 = sql_context.read.format("json").schema(schema_to_read).load(path)