2
votes

I have to update historical data. By update, I mean adding new rows and sometimes new columns to an existing partition on S3.

The current partitioning is implemented by date: created_year={}/created_month={}/created_day={}. In order to avoid too many objects per partition, I do the following to maintain single object/partition:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions

A scenario exists where I have to add certain rows that have these columnar values:

created_year | created_month | created_day
2019         |10             |27   

This means that the file(S3 object) at this path: created_year=2019/created_month=10/created_day=27/some_random_name.parquet will be appended with the new rows.

If there is a change in the schema, then all the objects will have to implement that change.

I tried looking into how this works generally, so, there are two modes of interest: overwrite, append.

The first one will just add the current data and delete the rest. I do not want that situation. The second one will append but may end up creating more objects. I do not want that situation either. I also read that dataframes are immutable in Spark.

So, how do I achieve appending the new data as it arrives to existing partitions and maintaining one object per day?

1

1 Answers

3
votes

Based on your question I understand that you need to add new rows to the existing data while not increasing the number of parquet files. This can be achieved by doing operations on specific partition folders. There might be three cases while doing this.

1) New partition

This means the incoming data has a new value in the partition columns. In your case, this can be like:

Existing data

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  1  |

New data

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  2  |

So, in this case, you can just create a new partition folder for the incoming data and save it as you did.

partition_path = "/path/to/data/year=2020/month=1/day=2"
new_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

2) Existing partition, new data

This is where you want to append new rows to the existing data. It could be like:

Existing data

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |

New data

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  b  |   1   |

Here we have a new record for the same partition. You can use the "append mode" but you want a single parquet file in each partition folder. That's why you should read the existing partition first, union it with the new data, then write it back.

partition_path = "/path/to/data/year=2020/month=1/day=1"
old_data = spark.read.parquet(partition_path)
write_data = old_data.unionByName(new_data)
write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

3) Existing partition, existing data

What if the incoming data is an UPDATE, rather than an INSERT? In this case, you should update a row instead of inserting a new one. Imagine this:

Existing data

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |

New data

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   2   |

"a" had a value of 1 before, now we want it to be 2. So, in this case, you should read existing data and update existing records. This could be achieved like the following.

partition_path = "/path/to/data/year=2020/month=1/day=1"
old_data = spark.read.parquet(partition_path)
write_data = old_data.join(new_data, ["year", "month", "day", "key"], "outer")
write_data = write_data.select(
    "year", "month", "day", "key",
    F.coalesce(new_data["value"], old_data["value"]).alias("value")
)
write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)

When we outer join the old data with the new one, there can be four things,

  • both data have the same value, doesn't matter which one to take
  • two data have different values, take the new value
  • old data doesn't have the value, new data has, take the new
  • new data doesn't have the value, old data has, take the old

To fulfill what we desire here, coalesce from pyspark.sql.functions will do the work.

Note that this solution covers the second case as well.

About schema change

Spark supports schema merging for the parquet file format. This means you can add columns to or remove from your data. As you add or remove columns, you will realize that some columns are not present while reading the data from the top level. This is because Spark disables schema merging by default. From the documentation:

Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

To able to read all columns, you need to set the mergeSchema option to true.

df = spark.read.option("mergeSchema", "true").parquet(path)