2
votes

Suppose we have a PySpark dataframe with data spread evenly across 2048 partitions, and we want to coalesce to 32 partitions to write the data back to HDFS. Using coalesce is nice for this because it does not require an expensive shuffle.

But one of the downsides of coalesce is that it typically results in an uneven distribution of data across the new partitions. I assume that this is because the original partition IDs are hashed to the new partition ID space, and the number of collisions is random.

However, in principle it should be possible to coalesce evenly, so that the first 64 partitions from the original dataframe are sent to the first partition of the new dataframe, the next 64 are send to the second partition, and so end, resulting in an even distribution of partitions. The resulting dataframe would often be more suitable for further computations.

Is this possible, while preventing a shuffle?

I can force the relationship I would like between initial and final partitions using a trick like in this question, but Spark doesn't know that everything from each original partition is going to a particular new partition. Thus it can't optimize away the shuffle, and it runs much slower than coalesce.

1
Thanks for editing the answer and sorry for the misunderstanding. Why do you assume that coalescing will spread the data unevenly? If the current number of partitions is a multiple of the desired one, I would expect each new partition to have an even number of upstream partitions in the lineage of the coalesced one. Does this make sense for you? I'll check in the code anyhow. - stefanobaghino
Checked and added a new answer, you're safe to coalesce and be safe about not going to shuffle while retaining the even-ness. :) - stefanobaghino

1 Answers

1
votes

In your case you can safely coalesce the 2048 partitions into 32 and assume that Spark is going to evenly assign the upstream partitions to the coalesced ones (64 for each in your case).

Here is an extract from the Scaladoc of RDD#coalesce:

This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.

Consider that also how your partitions are physically spread across the cluster influence the way in which coalescing happens. The following is an extract from CoalescedRDD's ScalaDoc:

If there is no locality information (no preferredLocations) in the parent, then the coalescing is very simple: chunk parents that are close in the Array in chunks. If there is locality information, it proceeds to pack them with the following four goals:

(1) Balance the groups so they roughly have the same number of parent partitions

(2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer

(3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)

(4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine