18
votes

I am writing an ETL process where I will need to read hourly log files, partition the data, and save it. I am using Spark (in Databricks). The log files are CSV so I read them and apply a schema, then perform my transformations.

My problem is, how can I save each hour's data as a parquet format but append to the existing data set? When saving, I need to partition by 4 columns present in the dataframe.

Here is my save line:

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

The problem is that if the destination folder exists the save throws an error. If the destination doesn't exist then I am not appending my files.

I've tried using .mode("append") but I find that Spark sometimes fails midway through so I end up loosing how much of my data is written and how much I still need to write.

I am using parquet because the partitioning substantially increases my querying in the future. As well, I must write the data as some file format on disk and cannot use a database such as Druid or Cassandra.

Any suggestions for how to partition my dataframe and save the files (either sticking to parquet or another format) is greatly appreciated.

2
Can you share the error you get when you use .mode(append). - Shagun Sodhani
The error I get is this: Caused by: java.io.IOException: File already exists:/tracking/v4/010316/gif=a/partnerID=111/year=2016/month=1/day=3/part-r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet I think that this error is thrown due to a task scheduling mismatch when some of the write operations take a long time. - Saman
@Saman How did you solve this issue? I am trying to handle similar situation here. More than 10000 partitions, in fact even more, since one partition per hour. I am trying to build a solution around this, any suggestions.? Thanks. - ds_user
@ds_user, using the latest version of spark (I am using Databricks Runtime Environment which has many features outside of open source spark), you can use the "append" mode in the writer. Before you solve this issue, you should reconsider your partition column(s) that are resulting in 10k+ partitions. I don't know the amount of data you have but is it possible to partition by day instead of hour? Try repartitioning your data to ensure you have 200-500MB files after you write them. Are you trying to write all 10k partitions at once or is each batch a few partitions? - Saman
This is the problem I am trying to solve. stackoverflow.com/questions/50197782/… . I am trying to partition by listing id (or) product id. So want to store listingId=612/year=2018/month=3/day=5 If I roll up to daily partitions for every listing. But still this is too many partition because of the increasing number of listings. Any suggestions? - ds_user

2 Answers

14
votes

If you need to append the files, you definitely have to use the append mode. I don't know how many partitions you expect it to generate, but I find that if you have many partitions, partitionBy will cause a number of problems (memory- and IO-issues alike).

If you think that your problem is caused by write operations taking too long, I recommend that you try these two things:

1) Use snappy by adding to the configuration:

conf.set("spark.sql.parquet.compression.codec", "snappy")

2) Disable generation of the metadata files in the hadoopConfiguration on the SparkContext like this:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

The metadata-files will be somewhat time consuming to generate (see this blog post), but according to this they are not actually important. Personally, I always disable them and have no issues.

If you generate many partitions (> 500), I'm afraid the best I can do is suggest to you that you look into a solution not using append-mode - I simply never managed to get partitionBy to work with that many partitions.

1
votes

If you're using unsorted partitioning your data is going to be split across all of your partitions. That means every task will generate and write data to each of your output files.

Consider repartitioning your data according to your partition columns before writing to have all the data per output file on the same partitions:

data
 .filter(validPartnerIds($"partnerID"))
 .repartition([optional integer,] "partnerID","year","month","day")
 .write
 .partitionBy("partnerID","year","month","day")
 .parquet(saveDestination)

See: DataFrame.repartition