1
votes

I have a RDD with the following number of elements in each partition (total number of partitions is val numPart = 32:

1351, 962, 537, 250, 80, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 88, 270, 635, 1028, 1388, 1509

To see the previous output I use this:

def countByPartition[A](anRdd: RDD[A]): RDD[Int] = anRdd.mapPartitions(iter => Iterator(iter.length))

println(countByPartition(anRdd).collect.mkString(", "))

I would like to have on each partition at least a minimum number of elements given by val min = 5.

I've tried to perform anRdd.repartition(numPart) and I get the following:

257, 256, 256, 256, 255, 255, 254, 253, 252, 252, 252, 252, 252, 252, 252, 252, 251, 250, 249, 248, 248, 248, 248, 248, 261, 261, 260, 260, 259, 258, 258, 257

In this case, it was perfect because in each partition I have more than min elements. But it doesn't always gets the same and sometimes I get some partitions with values less than min value.

Is there a way to do what I want?

1

1 Answers

2
votes

It is not possible and in general you need to choose partitioning so that the sizes are roughly even. Partitioners in Spark basically implement two methods numPartitions and getPartition. The latter is a function from a single key to a partition number so other elements and thus the potential size of partitions are not known at this point.