0
votes

I have a question with respect to default partitioning in RDD.

case class Animal(id:Int, name:String)   
val myRDD = session.sparkContext.parallelize( (Array( Animal(1, "Lion"), Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5, "Chetah") ) ))

Console println myRDD.getNumPartitions  

I am running the above piece of code in my laptop which has 12 logical cores. Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which object needs to go to which partition. So in this case, the formula would be: hashCode() % 12 But when I further examine, I see all the RDDs are put in the last partition.

myRDD.foreachPartition( e => { println("----------"); e.foreach(println) } )

Above code prints the below(first eleven partitions are empty and the last one has all the objects. The line is to separate contents of each partition):

----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
----------
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!

1

1 Answers

1
votes

I don't think that means all your data are in the last partition. Rather, since foreachPartition is executed in parallel, it might be that the dashed lines have already been printed from all the executors, before the values are printed. The order of the printed lines does not indicate the order of execution.

If you try the code below (source), you can see that the data is evenly partitioned between the executors (at least on my machine):

myRDD.mapPartitionsWithIndex((index, itr) => itr.toList.map(x => x + "#" + index).iterator).collect
// res6: Array[String] = Array(Animal(1,Lion)#1, Animal(2,Elephant)#2, Animal(3,Jaguar)#3, Animal(4,Tiger)#4, Animal(5,Chetah)#5)