1
votes

I have a json file, which contains data that looks like this:

"Url": "https://sample.com", "Method": "POST", "Headers": [{"Key": "accesstoken", "Value": ["123"]}, {"Key": "id", "Value": ["abc"]}, {"Key": "context", "Value": ["sample"]}]

When reading the json, I explicitly define the schema as:

schema = StructType(
    [
      StructField('Url', StringType(), True),
      StructField('Method', StringType(), True),
      StructField("Headers",ArrayType(StructType([
        StructField('Key', StringType(), True),
        StructField("Value",ArrayType(StringType()),True),
      ]),True),True)
    ]
  )

The goal is to read the Key-Value data as columns instead of rows.

Url Method accesstoken id context
https://sample.com POST 123 abc sample

Exploding the "Headers" column only transforms it into multiple rows. Another problem with the data is that, instead of having a literal key-value pair (e.g. "accesstoken": "123"), my key value pair value is stored in 2 separate pairs!

I tried to iterate over the values to create a map first, but I am not able to iterate through the "Headers" column.

df_map = df.withColumn('map', to_json(array(*[create_map(element.Key, element.Value) for element in df.Headers])))

I also tried to read the "Headers" column as MapType(StringType, ArrayType(StringType)), but it could not read then value. It shows null when I did that.

Is there anyway to achieve this? Do I have to read the data as a plain text and pre-process the data instead of the dataframe?

1

1 Answers

1
votes

You were in the right way, but to concatenate your map must use reduce expression:

from pyspark.sql.types import *
import pyspark.sql.functions as f

# [...] Your dataframe initialization

df = df.select('Url', 'Method', f.explode(f.expr('REDUCE(Headers, cast(map() as map<string, array<string>>), (acc, el) -> map_concat(acc, map(el.Key, el.Value)))')))

# Transform key:value into columns
df_pivot = (df
            .groupBy('Url', 'Method')
            .pivot('key')
            .agg(f.first('value')))

array_columns = [column for column, _type in df_pivot.dtypes if _type.startswith('array')]
df_pivot = (df_pivot
            .withColumn('zip', f.explode(f.arrays_zip(*array_columns)))
            .select('Url', 'Method', 'zip.*'))

df_pivot.show(truncate=False)

Output

+------------------+------+-----------+-------+---+
|Url               |Method|accesstoken|context|id |
+------------------+------+-----------+-------+---+
|https://sample.com|POST  |123        |sample |abc|
+------------------+------+-----------+-------+---+