0
votes

I'm trying to tune the performance of spark, by the use of partitioning on a spark dataframe. Here is the code:

file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
    .where((func.col("organization") == organization)) 
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()

During the execution of first() I check the job stages in Spark UI and here what I find: Job details stage 7 details

  • Why there is no repartition step in the stage?
  • Why there is also stage 8? I only requested one action of first(). Is it because of the shuffle caused by the repartition?
  • Is there a way to change the repartition of the parquet files without having to occur to such operations? As initially when I read the df you can see that it's partitioned over 43k partitions which is really a lot (compared to its size when I save it to a csv file: 4 MB with 13k rows) and creating problems in further steps, that's why I wanted to repartition it.
  • Should I use cache() after repartition? df = df.repartition(10).cache()? As when I executed df.first() the second time, I also get a scheduled stage with 43k partitions, despite df.rdd.getNumPartitions() which returned 10. EDIT: the number of partitions is just to try. my questions are directed to help me understand how to do the right repartition.

Note: initially the Dataframe is read from a selection of parquet files in Hadoop.

I already read this as reference How does Spark partition(ing) work on files in HDFS?

1
What is your spark.default.parallelism? And how many partition are in your parquet file?RanP
I didn't understand the second questions.. if you mean the size of my parquet file, I don't know how to check that. Otherwise the default.parallelism is not set, thus using the default one.. total number of cores on all executor nodes or 2, whichever is larger and the allocation of CPU cores is dynamic.SarahData
You can see the number of partitions in your parquet file by the number of "partXXX" file in the file directory in your hdfs. This is starting number of partitions you will have after reading the file. You can always do a rdd.coalesce(10) after reading the file.RanP
do you know any command line that can help me count files "partXXX" number? I did a small google search and couldn't find one.SarahData
If the parquet file is 50 partitions, you'll have files part-0000 till part-0049 . So you just ls the directory and sort by file name.RanP

1 Answers

0
votes
  • Whenever there is shuffling, there is a new stage. and the
    repartition causes shuffling that"s why you have two stages.
  • the caching is used when you'll use the dataframe multiple times to avoid reading it twice.

Use coalesce instead of repartiton. I think it causes less shuffling since it only reduces the number of partitions.