0
votes

I have a parquet file /df saved in hdfs with 120 partitions. The size of each partition on hdfs is around 43.5 M.

Total size

hdfs dfs -du -s -h /df
5.1 G  15.3 G  /df
hdfs dfs -du -h /df
43.6 M  130.7 M  /df/pid=0
43.5 M  130.5 M  /df/pid=1
...
43.6 M  130.9 M  /df/pid=119

I want to load that file into Spark and keep the same number of partitions. However, Spark will automatically load the file into 60 partitions.

df = spark.read.parquet('df')
df.rdd.getNumPartitions()
60

HDFS settings:

'parquet.block.size' is not set.

sc._jsc.hadoopConfiguration().get('parquet.block.size')

returns nothing.

'dfs.blocksize' is set to 128.

float(sc._jsc.hadoopConfiguration().get("dfs.blocksize"))/2**20

returns

128

Changing either of those values to something lower does not result in the parquet file loading into the same number of partitions that are in hdfs.

For example:

sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 64*2**20)
sc._jsc.hadoopConfiguration().setInt("dfs.blocksize", 64*2**20)

I realize 43.5 M is well below 128 M. However, for this application, I am going to immediately complete many transformations that will result in each of the 120 partitions getting much closer to 128 M.

I am trying to save myself having to repartition in the application imeadiately after loading.

Is there a way to force Spark to load the parquet file with the same number of partitions that are stored on the hdfs?

2
what if you try setting this parameter? to 43.5MB (43500000 bytes) spark.conf.set("spark.files.maxPartitionBytes", 43500000)thePurplePython
Nope. Still pulls it into 60 partitions.Clay

2 Answers

0
votes

First, I'd start from checking on how Spark splits the data into partitions. By default it depends on the nature and size of your data & cluster. This article should provide you with the answer why your data frame was loaded to 60 partitions:

https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/sparksqlshufflepartitions_draft.html

In general - its Catalyst who takes care of all the optimization (including number of partitions), so unless there is really a good reason for custom settings, I'd let it do its job. If any of the transformations you use are wide, Spark will shuffle the data anyway.

0
votes

I can use the spark.sql.files.maxPartitionBytes property to keep the partition sizes where I want when importing.

The Other Configuration Options documentation for the spark.sql.files.maxPartitionBytes property states:

The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

Example (where spark is a working SparkSession):

spark.conf.set("spark.sql.files.maxPartitionBytes", 67108864) ## 64Mbi

To control the number of partitions during transformations, I can set spark.sql.shuffle.partitions, for which the documentation states:

Configures the number of partitions to use when shuffling data for joins or aggregations.

Example (where spark is a working SparkSession):

spark.conf.set("spark.sql.shuffle.partitions", 500)

Additionally, I can set spark.default.parallelism, for which the Execution Behavior documentation states:

Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

Example (where spark is a working SparkSession):

spark.conf.set("spark.default.parallelism", 500)