3
votes

Using, Spark 2.3 on EMR, I'm doing ETL and writing results in scala using dataframe.write.partitionyBy("column1").parquet("location") to write temporary results.

i then read my temporary data into a new frame and add another column to the data set. i write my final results using the below. i add bucketBy and sortBy to sort in order to increase performance on queries on "column2" which is commonly used for joins and other filters .

newdataframe.write.partitionBy("column1").bucketBy(1,"column2").sortBy("column2").option("path","location").saveAsTable("tablename").

the first line gives me 200 parts per partition, each one being 770mb. the 2nd gives me 200 parts per partition, each one 192mb.

Both data sets produce the same metrics (sum of column4), and almost the same number of rows (<0.1% difference).

Why is one result so much smaller than the other even though its the same parquet format, 2nd dataset has 1 more column, and same partition column?

Appreciate any help.

1

1 Answers

6
votes

Since you are writing the data in parquet, while saving with sorting the data, parquet dictionary will coding will kick in.

Parquet column oriented storage does Run length encoding with dictionary. That means lets say we have a long sequence of repeated word like

spark spark spark ... 1000 times. Now instead of storing this you can create a dictionary saying

spark --> 1

And represent the train of spark as : 1, 1000.

This reduces the space drastically.

However if in a row group (A logical horizontal partitioning of the data into rows. There is no physical structure that is guaranteed for a row group. A row group consists of a column chunk for each column in the dataset.) there is no such duplication of data present it won't do any dictionary optimization resulting higher size of the parquet files.

In your case since you are sorting the data, all the data with same value will come together in a single row group and parquet will be able to do dictionary optimization.

Parquet stores the compression logic in the footer. You can use the following code to print the footer for 2 of your different cases and compare the encoding and the compression.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.ParquetFileReader

object ParquetPrintFooter {

  def main (args: Array[String]) : Unit = {
  val config = new Configuration()
  val filePath = new Path("/data/test.par");
  val fileStatus = 
  FileSystem.get(config).getFileStatus(filePath)
  val parquetMetadata = 
  ParquetFileReader.readFooter(config, fileStatus, 
  ParquetMetadataConverter.NO_FILTER)
  println(parquetMetadata)
}

}