I'm trying to tune the performance of spark, by the use of partitioning on a spark dataframe. Here is the code:
file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
.where((func.col("organization") == organization))
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()
During the execution of first()
I check the job stages in Spark UI and here what I find:
- Why there is no
repartition
step in the stage? - Why there is also stage 8? I only requested one action of
first()
. Is it because of the shuffle caused by therepartition
? - Is there a way to change the repartition of the parquet files without having to occur to such operations? As initially when I read the
df
you can see that it's partitioned over 43k partitions which is really a lot (compared to its size when I save it to a csv file: 4 MB with 13k rows) and creating problems in further steps, that's why I wanted to repartition it. - Should I use
cache()
after repartition?df = df.repartition(10).cache()
? As when I executeddf.first()
the second time, I also get a scheduled stage with 43k partitions, despitedf.rdd.getNumPartitions()
which returned 10. EDIT: the number of partitions is just to try. my questions are directed to help me understand how to do the right repartition.
Note: initially the Dataframe is read from a selection of parquet files in Hadoop.
I already read this as reference How does Spark partition(ing) work on files in HDFS?
total number of cores on all executor nodes or 2, whichever is larger
and the allocation of CPU cores is dynamic. – SarahData