0
votes

I am having an issue in parsing inconsistent datatypes in pyspark. As shown in the example file below, SA key contains always a dictionary but sometimes it can appear as string value. When I try to fetch the column SA.SM.Name, I get the exception as shown below.

How do I put null for SA.SM.Name column in pyspark/hive for the values having other than JSONs. Can someone help me please?

I tried to cast to different datatypes but nothing worked or may be I would be doing something wrong.

Input file Contents: mypath

{"id":1,"SA":{"SM": {"Name": "John","Email": "[email protected]"}}}

{"id":2,"SA":{"SM": {"Name": "Jerry","Email": "[email protected]"}}}

{"id":3,"SA":"STRINGVALUE"}

df=spark.read.json(my_path)
df.registerTempTable("T")
spark.sql("""select id,SA.SM.Name from T """).show()

Traceback (most recent call last): File "", line 1, in File "/usr/lib/spark/python/pyspark/sql/session.py", line 767, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/usr/lib/spark/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "Can't extract value from SA#6.SM: need struct type but got string; line 1 pos 10"

1
could you post shcema of your df at the question? print(df.schema), if your SA is in STRING TYPE, then you have write function to convert to a MAP TYPEE.ZY.
>>> df.schema StructType(List(StructField(SA,StructType(List(StructField(SM,StringType,true))),true),StructField(id,LongType,true)))Vick
@e.zy : Can you please help on how to convert to MAP type?Vick

1 Answers

1
votes

That is not possible using dataframes, since the column SA is being read as string while spark loads it. But you can load the file/table using sparkContext as rdd and then use a cleaner function for mapping the empty dict value to the SA. Here i loaded the file as textFile, but do necessary implementation if it is hadoopfile.

def cleaner(record):
    output = ""
    print(type(record))
    try:
        output = json.loads(record)
    except Exception as e:
        print("exception happened")
    finally:
        if isinstance(output.get("SA"), str ):
            print("This is string")
            output["SA"] = {}
    return output

dfx = spark.sparkContext.textFile("file://"+my_path)

dfx2 = dfx.map(cleaner)

new_df = spark.createDataFrame(dfx2)
new_df.show(truncate=False)
+---------------------------------------------------+---+
|SA                                                 |id |
+---------------------------------------------------+---+
|[SM -> [Email -> [email protected], Name -> John]]  |1  |
|[SM -> [Email -> [email protected], Name -> Jerry]]|2  |
|[]                                                 |3  |
+---------------------------------------------------+---+

new_df.printSchema()
root
 |-- SA: map (nullable = true)
 |    |-- key: string
 |    |-- value: map (valueContainsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- id: long (nullable = true)


Note: if the output value of name has to be written to the same table/ column , this solution might not work and if you try to write back the loaded dataframe to the same table, then it will cause the SA column to break and you will get a list of names and emails as per the schema provided in the comments of the qn.