4
votes

I have an HDFS folder with two 250MB parquet files. The hadoop df block size is set to 128MB. Having the following code:

    JavaSparkContext sparkContext = new JavaSparkContext();

    SQLContext sqlContext = new SQLContext(sparkContext);
    DataFrame dataFrame = sqlContext.read().parquet("hdfs:////user/test/parquet-folder");
    LOGGER.info("Nr. of rdd partitions: {}", dataFrame.rdd().getNumPartitions());

    sparkContext.close();

I run it on the cluster with spark.executor.instances=3 and spark.executor.cores=4. I can see that the reading of the parquet files is split among 3 executors X 4 cores = 12 tasks:

   spark.SparkContext: Starting job: parquet at VerySimpleJob.java:25
   scheduler.DAGScheduler: Got job 0 (parquet at VerySimpleJob.java:25) with 12 output partitions

However, when I get the dataframe RDD (or create the RDD with toJavaRDD()) call, I get only 4 partitions. Is this controlled by the hdfs block size - 2 blocks for each file, hence 4 partitions?

Why exactly doesn't this match the number of partitions from the parquet (parent?) operation?

1
Answered below, but overall you're right - it's all about HDFS block size.Zyoma
Based on @Zyoma suggestions, I've updated the code trying to force smaller splits which would give more input partitions for the data frame. The following configurations have been changed: parquet.block.size, mapred.max.split.size, mapred.min.split.size all set to Long.toString(8 * 1024 * 1024L) . This still gives me back 4 partitionscristi.calugaru
How you got an answer how to get more partition after toJavaRDD call?Xiawei Zhang

1 Answers

3
votes

When you're reading a file with Spark neither the number of executors nor the number of cores affects the number of tasks in any fashion. The number of partitions (and tasks as a result) is only determined by the number of blocks in your input. If you have 4 files that are smaller than HDFS block size - that'd be 4 blocks anyway and 4 partitions as a result. The formula is number_of_files * number_of_blocks_in_file. So look into your folder and count how many files does it contain and what is the size of each file. That should answer your question.

UPD: everything above is true if you didn't manually repartition your DataFrame and if your DataFrame wasn't created as a result of join or any other shuffle operation.

UPD: fixed answer details.