4
votes

I have 160GB of data,partition on DATE Column and storing in parquet file format running on spark 1.6.0. I need to store the output parquet files with equal sized files in each partition with fixed size say like 100MB each.

I tried with below code:

  val blockSize= 1024*1024*100
  sc.hadoopConfiguration.setInt("dfs.blocksize", blockSize)
  sc.hadoopConfiguration.setInt("parquet.block.size",blockSize)

df1.write.partitionBy("DATE").parquet("output_file_path")

The above configuration is not working, it is creating multiple files with default number of partitions,not the 100 MB file.

2
As far as I know, that property is only an upper bound, not a fixed number. And Spark will always save each partition separately until you repartition the dataframeOneCricketeer
Any other way, I have store equal sized file instead of using repartition, because my data is not equally partitioned. few partitions contain more data and others contains few mbs of data.warner
Then you'll never get equally sized files anyway. I don't understand the purpose of this... In fact, HDFS and S3 even prefer larger files for extra processingOneCricketeer
Possible duplicate. Maybe this is what you are looking for stackoverflow.com/questions/39187622/…Gladiator

2 Answers

6
votes

Its not possible to get the exact same size for every file, but you can give enough hints to Spark to make them "within" a certain size. The general goal is to make each file equal to the HDFS block size and each file holds one (or more) row group. You want the row group to fit in one HDFS block. If a row group does not fit in one block, you have a situation where additional network calls needs to be made to read another HDFS block to completely read the row group.

To achieve this, do the following:

  • Set spark.sql.files.maxPartitionBytes in spark conf to 256 MB (equal to your HDFS block size)
  • Set parquet.block.size on the parquet writer options in Spark to 256 MB.

tradesDF.write.option("parquet.block.size", 256 * 1024 * 1024)

1
votes

you can try the following approach:

first, you should estimate the size of a single row in your data.
it's difficult to do accurately (since the parquet file contains metadata as well), but you can take 1000 rows of your data, write to a file, and estimate the size of a single row

from that calculate how many rows will fit in a 100MB:

N = 100MB / size_of_row 

now you can create an additional column, with a bucket id for each row:

val df2 = df.withColumn("bucket", (rank.over(Window.partitionBy("DATE")) / N).cast(IntegerType))

and now you can repartition your data by both the date and the bucket:

df2
  .repartition($"DATE", $"bucket")
  .dropColumn("bucket")
  .write
  .parquet(...)