This is working for me very well:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
It produces N files in each output partition (directory), and is (anecdotally) faster than using coalesce
and (again, anecdotally, on my data set) faster than only repartitioning on the output.
If you're working with S3, I also recommend doing everything on local drives (Spark does a lot of file creation/rename/deletion during write outs) and once it's all settled use hadoop FileUtil
(or just the aws cli) to copy everything over:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
Edit: As per discussion in comments:
You a dataset with a partition column of YEAR, but each given YEAR has vastly different amounts of data in it. So, one year might have 1GB of data, but another might have 100GB.
Here's psuedocode for one way to handle this:
val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
val subDf = df.filter(s"YEAR = $yearVal")
val numPartitionsToUse = subDf.count / partitionSize
subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})
But, I don't actually know what this will work. It's possible that Spark will have an issue reading in a variable number of files per column partition.
Another way to do it would be write your own custom partitioner, but I have no idea what's involved in that so I can't supply any code.