I have a RDD with the following number of elements in each partition (total number of partitions is val numPart = 32
:
1351, 962, 537, 250, 80, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 88, 270, 635, 1028, 1388, 1509
To see the previous output I use this:
def countByPartition[A](anRdd: RDD[A]): RDD[Int] = anRdd.mapPartitions(iter => Iterator(iter.length))
println(countByPartition(anRdd).collect.mkString(", "))
I would like to have on each partition at least a minimum number of elements given by val min = 5
.
I've tried to perform anRdd.repartition(numPart)
and I get the following:
257, 256, 256, 256, 255, 255, 254, 253, 252, 252, 252, 252, 252, 252, 252, 252, 251, 250, 249, 248, 248, 248, 248, 248, 261, 261, 260, 260, 259, 258, 258, 257
In this case, it was perfect because in each partition I have more than min
elements. But it doesn't always gets the same and sometimes I get some partitions with values less than min
value.
Is there a way to do what I want?