2
votes

I am reading a set of 10,000 parquet files of 10 TB cumulative size from HDFS and writing it back to HDFS in partitioned manner using following code

spark.read.orc("HDFS_LOC").repartition(col("x")).write.partitionBy("x").orc("HDFS_LOC_1")

I am using

spark.sql.shuffle.partitions=8000

I see that spark had written 5000 different partitions of "x" to HDFS(HDFS_LOC_1) . How is shuffle partitions of "8000" is being used in this entire process. I see that there are only 15,000 files got written across all partitions of "x". Does it mean that spark tried to create 8000 files at every partition of "X" and found during write time that there were not enough data to write 8000 files at each partition and ended up writing fewer files ? Can you please help me understand this?

1
Spark when reads the data from HDFS, the default no of partitions created in the resultant DF depends on the input splits created by Hadoop Input format used for reading that file.Mohd Avais

1 Answers

3
votes

The setting spark.sql.shuffle.partitions=8000 will set the default shuffling partition number of your Spark programs. If you try to execute a join or aggregations just after setting this option, you will see this number taking effect (you can confirm that with df.rdd.getNumPartitions()). Please refer here for more information.

In your case though, you are using this setting with repartition(col("x") and partitionBy("x"). Therefore your program will not be affected by this setting without using a join or an aggregation transformation first. The difference between repartition and partitionBy is that, the first will partition the data in memory, creating cardinality("x") number of partitions, when the second one will write approximately the same number of partitions to HDFS. Why approximately? Well because there are more factors that determine the exact number of output files. Please check the following resources to get a better understanding over this topic:

So the first thing to consider when using repartitioning by column repartition(*cols) or partitionBy(*cols), is the number of unique values (cardinality) that the column (or the combination of columns) has.

That being said, if you want to ensure that you will create 8000 partitions i.e output files, use repartition(partitionsNum, col("x")) where partitionsNum == 8000 in your case then call write.orc("HDFS_LOC_1"). Otherwise, if you want to keep the number of partitions close to the cardinality of x, just call partitionBy("x") to your original df and then write.orc("HDFS_LOC_1") for storing the data to HDFS. This will create cardinality(x) folders with your partitioned data.