6
votes

Suppose I create such an RDD (I am using Pyspark):

list_rdd = sc.parallelize(xrange(0, 20, 2), 6)

then I print the partitioned elements with the glom() method and obtain

[[0], [2, 4], [6, 8], [10], [12, 14], [16, 18]]

How has Spark decided how to partition my list? Where does that specific choice of the elements come from? It could have coupled them differently, leaving some other elements than 0 and 10 alone, to create the 6 requested partitions. At a second run, the partitions are the same.

Using a larger range, with 29 elements, I get partitions in the pattern of 2 elements followed by three elements:

list_rdd = sc.parallelize(xrange(0, 30, 2), 6)
[[0, 2], [4, 6, 8], [10, 12], [14, 16, 18], [20, 22], [24, 26, 28]]

Using a smaller range of 9 elements I get

list_rdd = sc.parallelize(xrange(0, 10, 2), 6)
[[], [0], [2], [4], [6], [8]]

So what I infer is that Spark is generating the partitions by splitting the list into a configuration where smallest possible is followed by larger collections, and repeated.

The question is if there is a reason behind this choice, which is very elegant, but does it also provide performance advantages?

1

1 Answers

3
votes

Unless you specify a specific partitioner, then this is "random" in that it depends on the specific implementation of that RDD. In this case you can head to the ParallelCollectionsRDD to dig into it further.

getPartitions is defined as:

val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray

where slice is commented as (reformatted to fit better):

/**
* Slice a collection into numSlices sub-collections. 
* One extra thing we do here is to treat Range collections specially, 
* encoding the slices as other Ranges to minimize memory cost. 
* This makes it efficient to run Spark over RDDs representing large sets of numbers. 
* And if the collection is an inclusive Range, 
* we use inclusive range for the last slice.
*/

Note that there are some considerations with regards to memory. So, again, this is going to be specific to the implementation.