1
votes

I have a complex data issue that for the sake of argument cannot be modified. Here is a dummy example JSON file from a database dump:

{"payload": {"table": "Table1", "data": {"colA": 1, "colB": 2}}}
{"payload": {"table": "Table2", "data": {"colA": 1, "colC": 2}}}

So I have in each file in the directory data from multiple tables each with their own schema. The schema can change over time as columns are added upstream, so we can't define a static schema and are relying on schema inference.

This is my current workflow (high-level):

  • Load entire directory of JSON data into one dataframe
  • Find unique tables present in this batch of changes
  • For each table, filter dataframe down to just that tables data
  • Read the schema for the table subset (Table1 is A+B, Table2 is A+C)
  • Do some validation
  • Merge records with destination

Example code:

import pyspark.sql.functions as F

df = spark.read.text(directory)
with_table_df = (
    df
    .withColumn("table", F.get_json_object('value', '$.payload.table'))
    .withColumn("json_payload_data", F.get_json_object('value', '$.payload.data'))
)
unique_tables = with_table_df.select('table').distinct().rdd.map(lambda r: r[0]).collect()

for table in unique_tables:
    filtered_df = with_table_df.filter(f"table = '{table}'")
    table_schema = spark.read.json(filtered_df.rdd.map(lambda row: row.json_payload_data)).schema

    changes_df = (
        filtered_df
        .withColumn('payload_data', F.from_json('json_payload_data', table_schema))
        .select('payload_data.*')
    )

    # do some validation
    if valid:
        changes_df.write.mode("append").option("mergeSchema", "true").saveAsTable(target_table)

My issue is that I can't load the the directory using spark.read.json() since it will apply a superset schema to all records and I won't be able to determine which columns are Table1 columns and which are Table2 columns. Right now I am loading as text, extracting key JSON elements (payload.table), then only parsing as JSON when I have records of the same schema. This will work yet puts a lot of load on the driver node.

But I don't think filtering and iterating over the Dataframe rows is a good approach. I'd like to leverage foreachPartition in some way to map the validation/selection logic to executor nodes, but I am unable to due to the way the JSON schema is created using spark.read.json() (cant serialize to driver nodes).

How could I re-work this to be more suitable for the Spark architecture?

UPDATE:

I am looking to modify the data creation process so that JSON files are partitioned by table, that way I can simply spark.read.json(table_path) for each unique path

2
In your current example, the two lines are supposed to be a different schema ? because for me, it is the same.Steven
Table1 has columns A and B, Table2 has columns A and CTomNash
The other issue, like I said, is that tables can add columns over time.TomNash
And you cannot define the different file with their name ?Steven
Unfortunately no, if I could map records to files with the same table, it would be much easier. It's a dump of all tables partitioned by time.TomNash

2 Answers

1
votes

I created a dummy file with your data.

Here is the simple code that you want to avoid:

df = spark.read.json("test.json")

df.show()
+-----------------+
|          payload|
+-----------------+
|[[1, 2,], Table1]|
|[[1,, 2], Table2]|
+-----------------+

df.printSchema()
root
 |-- payload: struct (nullable = true)
 |    |-- data: struct (nullable = true)
 |    |    |-- colA: long (nullable = true)
 |    |    |-- colB: long (nullable = true)
 |    |    |-- colC: long (nullable = true)
 |    |-- table: string (nullable = true)

The problem here is that for each new "col*", you will need to add it to your schema, and to each json line. It is automatic but it is not convenient.

You need to trick a little bit your schema for that. The type of the field Data is not a struct but a map:

from pyspark.sql import types as T

schm = T.StructType(
    [
        T.StructField(
            "payload",
            T.StructType(
                [
                    T.StructField("data", T.MapType(T.StringType(), T.IntegerType())),
                    T.StructField("table", T.StringType()),
                ]
            ),
        )
    ]
)


df = spark.read.json("test.json", schema=schm)

df.show()
+--------------------+                                                          
|             payload|
+--------------------+
|[[colA -> 1, colB...|
|[[colA -> 1, colC...|
+--------------------+

df.printSchema()
root
 |-- payload: struct (nullable = true)
 |    |-- data: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: integer (valueContainsNull = true)
 |    |-- table: string (nullable = true)
1
votes

Your schemas aren't different. You have a "payload" struct with "table" string and "data" map

If you are confused how to define the schema for the "data", refer MapType

data_schema = MapType(StringType(), IntegerType(), False)
payload_schema = StructType(
    [
        StructField("table", StringType(), False),
        StructField("data", data_schema, False),
    ]
)
schema = StructType(
    [
        StructField("payload", payload_schema, False),
    ]
)