I have a (scala/spark) DataFrame df
that I would like to save to parquet with roughly 128MB per parquet file. Based on the schema, I roughly estimated the size, in MB, of one line.
My strategy is to repartition the dataframe, with a value that generates "oversized" partitions. Then I leverage the option maxRecordsPerFile
to get my wanted parquet file size:
val countLines = df.count
val estimatedSize = countLines * 250 / (1024 * 1024) // one line is around 250 Bytes
val repartitionEstimate = (estimatedSize / 512).toInt + 1 //I "under" repartition my dataframe so I get roughly 512MB per partition
val maxRecordsPerFile = ((128 / 250.0) * 1024 * 1024).toLong //I save, hoping to have only 128MB-heavy parquet files
df.repartition(repartitionEstimate).write
.option("maxRecordsPerFile", maxRecordsPerFile).partitionBy(
"year",
"month",
"day",
"hour"
).parquet(outputPath)
When I did this, I got in each of my folders parquet files of about 128MB but some of them were very tiny, like 10MB. How could I solve this? I would rather have a 133MB file than one 128MB and one 5MB file! So I would like my solution to be a little bit more flexible. Thanks in advance!
EDIT: The reason I am not directly repartitioning based on the 128MB size is that, while it is working fine if I have a single folder, it seems to "break" with hourly folders: in one I had mostly 80MB files, in another mostly 40MB files...