2
votes

I am currently working with the adwords api and I have to process 1, 7 and 30 days of data. Thus, the spark job is basic, load the csv and write it into parquet with a partition:

df.write
  .mode("append")
  .format("parquet")
  .partitionBy("customer_id", "date")
  .option("path", warehouse_location+"/"+table)
  .saveAsTable(table)

Now the problem I'm facing is that 7 and 30 days will go through already processed data at some point (through the 1 day ago) and thus on my partition table/customer_id/date/file.parquet the append will append a second parquet file to this partition.

But in this specific case, I would like that the new parquet file overwrite the previous one (as adwords csv will be subject to change between the first day it's been generated and 7 / 30 days later).

I have looked around and if I try to use "overwrite", it overwrites the full table and not just the partition.

Do you have any suggestion about how to proceed here?

I'm not a Spark expert and right now my only option in mind is to have a script that will clean the place based on the file timestamp. But it doesn't feel like the right solution here.

PS: I'm using Spark 2.4

1
what version of pyspark?IWHKYB
I'm running it with the 2.4, I'll update the questionJay Cee
I faced the same problem. On paper there is a replace where function, that didnt work as described. The solution was to delete the partitions via sql query und then to append the partitions.Christopher
@Christopher thank for the input! does this work with format('parquet')?Jay Cee

1 Answers

1
votes

Based on SPARK-20236, you should set spark.sql.sources.partitionOverwriteMode="dynamic" property, and then use "overwrite" mode to replace individual partitions in existing table.