2
votes

I inspected the output parquet file of a spark job that always beaks, because of Out of Memory Errors. I use Spark 1.6.0 on Cloudera 5.13.1

I noticed that the parquet row group size is uneven. The first and the last row group are huge. The rest is really small...

Shortened output from parquet-tools RC = row count, TS = total size:

row group 1:                RC:5740100 TS:566954562 OFFSET:4  
row group 2:                RC:33769 TS:2904145 OFFSET:117971092  
row group 3:                RC:31822 TS:2772650 OFFSET:118905225  
row group 4:                RC:29854 TS:2704127 OFFSET:119793188  
row group 5:                RC:28050 TS:2356729 OFFSET:120660675  
row group 6:                RC:26507 TS:2111983 OFFSET:121406541  
row group 7:                RC:25143 TS:1967731 OFFSET:122069351  
row group 8:                RC:23876 TS:1991238 OFFSET:122682160  
row group 9:                RC:22584 TS:2069463 OFFSET:123303246  
row group 10:               RC:21225 TS:1955748 OFFSET:123960700  
row group 11:               RC:19960 TS:1931889 OFFSET:124575333  
row group 12:               RC:18806 TS:1725871 OFFSET:125132862  
row group 13:               RC:17719 TS:1653309 OFFSET:125668057  
row group 14:               RC:1617743 TS:157973949 OFFSET:134217728

Is this a known bug? How can I set the parquet block size (row group size) in Spark?

EDIT:
What the Spark application does is: It reads a big AVRO file, then distributes the rows by two partition keys (using distribute by <part_keys> in the select) and then writes a parquet file for each partition using:
DF.write.partitionBy(<part_keys>).parquet(<path>)

2
I have used 13 Executors. Is it possible, that the node local rows go into the big row group and the remote reads from each executor go into a seperate row group?Joha
were you able to fix your issue or find a workaround ?cheseaux
No i was not able to find a workaround yetJoha

2 Answers

1
votes

Your RDD could be unevenly partitioned. The number of rows in each block is related to the size of the different partition of your RDD.

When a RDD is created, each partition contains roughly the same amount of data (due to the HashPartitioner). After the processing of the Spark job, one partition could contain more data than another , maybe a filter transformation removed more rows from one partition than from another. The partitions can be rebalanced calling repartition before writing the parquet file.

EDIT: if the problem is not related to the partitions, maybe reducing the size of the row groups could help:

sc.hadoopConfiguration.setInt( "parquet.block.size", blockSize ) 
1
votes

There is a known bug for this : PARQUET-1337