I have a complex data issue that for the sake of argument cannot be modified. Here is a dummy example JSON file from a database dump:
{"payload": {"table": "Table1", "data": {"colA": 1, "colB": 2}}}
{"payload": {"table": "Table2", "data": {"colA": 1, "colC": 2}}}
So I have in each file in the directory data from multiple tables each with their own schema. The schema can change over time as columns are added upstream, so we can't define a static schema and are relying on schema inference.
This is my current workflow (high-level):
- Load entire directory of JSON data into one dataframe
- Find unique tables present in this batch of changes
- For each table, filter dataframe down to just that tables data
- Read the schema for the table subset (Table1 is A+B, Table2 is A+C)
- Do some validation
- Merge records with destination
Example code:
import pyspark.sql.functions as F
df = spark.read.text(directory)
with_table_df = (
df
.withColumn("table", F.get_json_object('value', '$.payload.table'))
.withColumn("json_payload_data", F.get_json_object('value', '$.payload.data'))
)
unique_tables = with_table_df.select('table').distinct().rdd.map(lambda r: r[0]).collect()
for table in unique_tables:
filtered_df = with_table_df.filter(f"table = '{table}'")
table_schema = spark.read.json(filtered_df.rdd.map(lambda row: row.json_payload_data)).schema
changes_df = (
filtered_df
.withColumn('payload_data', F.from_json('json_payload_data', table_schema))
.select('payload_data.*')
)
# do some validation
if valid:
changes_df.write.mode("append").option("mergeSchema", "true").saveAsTable(target_table)
My issue is that I can't load the the directory using spark.read.json()
since it will apply a superset schema to all records and I won't be able to determine which columns are Table1
columns and which are Table2
columns. Right now I am loading as text, extracting key JSON elements (payload.table
), then only parsing as JSON when I have records of the same schema. This will work yet puts a lot of load on the driver node.
But I don't think filtering and iterating over the Dataframe rows is a good approach. I'd like to leverage foreachPartition
in some way to map the validation/selection logic to executor nodes, but I am unable to due to the way the JSON schema is created using spark.read.json()
(cant serialize to driver nodes).
How could I re-work this to be more suitable for the Spark architecture?
UPDATE:
I am looking to modify the data creation process so that JSON files are partitioned by table, that way I can simply spark.read.json(table_path)
for each unique path
Table1
has columns A and B,Table2
has columns A and C – TomNash