1
votes

Plenty of questions have been already posed about the number of Spark tasks and how this relates to the number of partitions. But somehow I cannot understand the following case.

I have a Hive table (an HDFS folder) which contains 160 Parquet-compressed files. The files are mostly well balanced: smallest is 7.5MB, largest 49.2MB. In the HDFS browser I see each file is within 1 (non-full) HDFS block (128MB).

The cluster has the following properties: 10 machines, 1 master and 9 workers. Each machine has 6 Cores (12 virtual cores). I am using Yarn. Moreover:

spark.executor.cores = 6

Now, I create the following dataframe:

val myDF = spark.sql("SELECT * FROM myHiveTable WHERE myCol='someValue')

Even before the job is triggered, it is possible to know in advance that:

myDF.rdd.partitions.size

returns 60.

To trigger the job one needs an action, so I write "myDF" to HDFS. The job indeed results in 42 Executors and 60 tasks.

My questions:

  1. If I started with 160 partitions, how come I ended up having 60?

  2. If I have 60 tasks and 10 machines, then I would optimally need only 10 executors (somewhere I read that each executor can run as many tasks in parallel as the number of cores, which in my case is 6). I know that this would happen only if the dataset is perfectly balanced among the datanodes, but 42 Executors seems to me far away from 10. Or is my reasoning wrong?

  3. How can Spark know in advance, even before running the query, that this will result in 60 partitions.

Thank you!

1
For the record, the level of parallelism with Parquet files is the number of "row blocks", i.e. possibly more than the number of files (or less, if you have empty files).Samson Scharfrichter
@SamsonScharfrichter Can you clarify your comment please?thebluephantom
Any progress here?thebluephantom

1 Answers

1
votes

I think this might be the answers to questions 1 and 3.

Apparently, if I read the Hive table (which is actually a folder) as an rdd I also end up having the same number of partitions (60). The number of partitions of an rdd is determined by the specific InputFormat. Source: Spark RDD default number of partitions

So I searched for what the behavior is for Parquet, triggered by the comment of @Samson Scharfrichter and I found this: https://github.com/animeshtrivedi/notes/wiki/Parquet-partition-calculation

Min(defaultMinSplitSize (128MB, `spark.sql.files.maxPartitionBytes`,
    Max(openCostInByte(8MB, `spark.sql.files.openCostInBytes`,
        totalSize/defaultParallelism)
)

So maybe this formula explains why 60, and how this can be calculated in advance.