3
votes

I am learning spark, and when I tested repartition() function in pyspark shell with the following expression, I observed a very strange result: all elements fall into the same partition after repartition() function. Here, I used glom() to learn about the partitioning within the rdd. I was expecting repartition() to shuffle the elements and randomly distribute them among partitions. This only happens when I repartition with new number of partitions <= original partitions.

During my test, if I set new number of partitions > original partitions, there is also no shuffling observed. Am I doing anything wrong here?

In [1]: sc.parallelize(range(20), 8).glom().collect()
Out[1]:
[[0, 1],
 [2, 3],
 [4, 5],
 [6, 7, 8, 9],
 [10, 11],
 [12, 13],
 [14, 15],
 [16, 17, 18, 19]]

In [2]: sc.parallelize(range(20), 8).repartition(8).glom().collect()
Out[2]:
[[],
 [],
 [],
 [],
 [],
 [],
 [2, 3, 6, 7, 8, 9, 14, 15, 16, 17, 18, 19, 0, 1, 12, 13, 4, 5, 10, 11],
 []]

In [3]: sc.parallelize(range(20), 8).repartition(10).glom().collect()
Out[3]:
[[],
 [0, 1],
 [14, 15],
 [10, 11],
 [],
 [6, 7, 8, 9],
 [2, 3],
 [16, 17, 18, 19],
 [12, 13],
 [4, 5]]

I am using spark version 2.1.1.

2

2 Answers

3
votes

Congratulations! You just rediscovered SPARK-21782 - Repartition creates skews when numPartitions is a power of 2:

Currently, the algorithm for repartition (shuffle-enabled coalesce) is as follows:

for each initial partition index, generate position as (new Random(index)).nextInt(numPartitions) then, for element number k in initial partition index, put it in the new partition position + k (modulo numPartitions).

So, essentially elements are smeared roughly equally over numPartitions buckets - starting from the one with number position+1.

Note that a new instance of Random is created for every initial partition index, with a fixed seed index, and then discarded. So the position is deterministic for every index for any RDD in the world. Also, nextInt(bound) implementation has a special case when bound is a power of 2, which is basically taking several highest bits from the initial seed, with only a minimal scrambling.

PySpark makes it worse, because it uses batched serializer with default batch size equal to 10 so with small number of items on each partition, all are shuffled to the same output.

Good news is, it is already resolved in Spark 2.3 thanks to Sergey Serebryakov.

0
votes

Ah, I think there is something to do with the underlying partitioner. I tried with much bigger number and now the result makes more sense.

In [95]: [len(lst) for lst in sc.parallelize(range(1000), 8).glom().collect()]
Out[95]: [125, 125, 125, 125, 125, 125, 125, 125]

In [96]: [len(lst) for lst in sc.parallelize(range(1000), 8).repartition(10).glom().collect()]
Out[96]: [95, 95, 100, 105, 95, 95, 100, 105, 105, 105]

In [97]: [len(lst) for lst in sc.parallelize(range(1000), 8).repartition(5).glom().collect()]
Out[97]: [190, 195, 205, 210, 200]