0
votes

I have written a dataframe to a parquet file using spark that has 100 sub directory (each sub directory contains one files) on HDFS. This file has 100GB .

when I repartition the dataframe to 10 partition and write it to HDFS, the size of the output parquet files increases to about 200GB. why this is happend? what is the optimum number of partition when writing to a parquet file?

My question is diffrent from this question and I think It's not duplicate. That question maybe answer first part of my question although that's not the same (why is this happend?) but my main question is: what is the optimum number of partition when writing to a parquet file?

1
It's not duplicate. That question maybe answer first part of my question although that's not the same (why is this happend?) but my main question is: what is the optimum number of partition when writing to a parquet file?david_js

1 Answers

2
votes

It all comes down to use. It comes in two flavors is there a logical identifier in my data that will be consitently be searched upon for use or do I just care about file efficientcy.

(1) Logical identifier, if your data has a column(s) that are being used consitently (i.e. transaction time or input time) you can parttion along those lines, this will allow for your process to quickly parse the data allowing quicker query time. The downside to partitioning is that going over 2K is known to break technologies like Impala so don't go too crazy.

(2) Size partitioning, if you are looking at just optimizing file size for movement around the environment and other services/tools. I would advise trying to set the data size to 128MB per partition. This will allow for quicker movement over other tool that might have issues processing a series of smaller files (i.e. AWS S3). Below is some code for setting your partitions based on data size.

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.DataFrame 
import org.apache.spark.util.SizeEstimator 

val inputDF2 : Long = SizeEstimator.estimate(inputDF.rdd) 
//find its appropiate number of partitions 
val numPartitions : Long = (inputDF2/134217728) + 1 
//write it out with that many partitions  
val outputDF = inputDF.repartition(numPartitions.toInt) 

Without knowing your data I cannot tell you if it would be better to partition by logical identified, by byte size, or a combination of both. I hope I gave you enough information to help you figure out what you want to do.