coalesce(n, shuffle = true)
which is also equivalent to repartition(n)
may have, depending on what mapping or any other processing login you have in your parent RDDs, considerable effect on how your job performs.
In general, when data in your parent partitions are evenly distributed and you are not drastically decreasing number of partitions, you should avoid using shuffle when using coalesce
.
However, in your case this is a substantial reduction in the number of partitions and as per the documentation
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can pass shuffle = true. This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is)
Given that, now you need to properly assess and choose between
- Shuffling potentially huge amounts of data but doing computations in parent partitions in parallel
- Collecting all the partitions into one without full reshuffling (there will still of course be data movements) but doing computations within a single task
For example, consider the following snippets which are far from the actual logic you may have but will give you a perspective on what is happening
// fast
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = true)
.toDF.write.text("shuffleTrue")
// slow
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = false)
.toDF.write.text("shuffleFalse")
On my cluster that with shuffle = true
showed total time of roughly 5 seconds with 10 tasks, performing computation logic on each parent partition in parallel.
And the other with shuffle = false
had roughly 50 seconds doing all the computation within a single task on one executor.