2
votes

2nd attempt here, as presented the wrong example initially. From the docs:

preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn't modify the keys.

Nice prose, but what does it really mean though?

Here a contrived trivial example, and whether or not I pass true or false to mapPartitions, the partitioning of data per partition remains the same for the new RDD even though I alter the K of K,V. So what is the point? Must be something elementary that I am missing.

import org.apache.spark.HashPartitioner
// Some contrived function
def myfunc(iter: Iterator[(String, (Int, String))]) : Iterator[(String, (Int,String))] = {
    iter.map{case(x,y) => ("B"+x+"A", y)} 
}
val rdd1 = sc.parallelize(1 to 9).map(x => ("RFD"+x+"ABC", (1000, "xc888x"+x))).partitionBy(new HashPartitioner(459))
val rdd2 = rdd1.mapPartitions(myfunc,true) // or false
rdd2.collect

Output of rdd2 for both true and false with mapPartitionsWithIndex reveals in both cases:

res21: Array[String] = Array((BRFD5ABCA,(1000,xc888x5)) -> 22, (BRFD4ABCA,(1000,xc888x4)) -> 66, (BRFD3ABCA,(1000,xc888x3)) -> 110, (BRFD2ABCA,(1000,xc888x2)) -> 154, (BRFD1ABCA,(1000,xc888x1)) -> 198, (BRFD9ABCA,(1000,xc888x9)) -> 305, (BRFD8ABCA,(1000,xc888x8)) -> 349, (BRFD7ABCA,(1000,xc888x7)) -> 393, (BRFD6ABCA,(1000,xc888x6)) -> 437)

which is the same partition distribution for rdd1.

So, what is the point of true or false for preservesPartitioning then?

1
@user10938362 re-attempt as I posted the wrong example thru all the xmas bustle oddly enough. May be this one makes more sense!thebluephantom

1 Answers

2
votes

This is also quite non-intuitive for me. I may quote a statement from from Apache Spark User List which suits your question:

This is not what preservesPartitioning does -- actually what it means is that if the RDD has a Partitioner set (which means it's an RDD of key-value pairs and the keys are grouped into a known way, e.g. hashed or range-partitioned), your map function is not changing the partition of keys. This lets the job scheduler know that downstream operations, like joins or reduceByKey, can be optimized assuming that all the data for a given partition is located on the same machine. In both cases though, your function f operates on each partition.

In your example, consider the following code with preservePartitioing=false:

val rdd2 = rdd1.mapPartitions(myfunc,false) // or false
rdd2.groupByKey().map{case (key,values) => values.size}.toDebugString

gives:

(459) MapPartitionsRDD[5] at map at Spark.scala:44 []
  |   ShuffledRDD[4] at groupByKey at Spark.scala:44 []
  +-(459) MapPartitionsRDD[3] at mapPartitions at Spark.scala:42 []
      |   ShuffledRDD[2] at partitionBy at Spark.scala:41 []
      +-(4) MapPartitionsRDD[1] at map at Spark.scala:41 []
         |  ParallelCollectionRDD[0] at parallelize at Spark.scala:41 []

While with preservePartitioing=true:

(459) MapPartitionsRDD[5] at map at Spark.scala:44 []
  |   MapPartitionsRDD[4] at groupByKey at Spark.scala:44 []
  |   MapPartitionsRDD[3] at mapPartitions at Spark.scala:42 []
  |   ShuffledRDD[2] at partitionBy at Spark.scala:41 []
  +-(4) MapPartitionsRDD[1] at map at Spark.scala:41 []
     |  ParallelCollectionRDD[0] at parallelize at Spark.scala:41 []

So in the first case, groupByKey causes an additional shuffle, because spark does not know that the keys reside in the same partition (as the partitioner is lost), in the second case, groupByKey is translated to a simple mapPartitions because spark knows that the first mapPartitions did not change the partitioning, i.e. that the keys are still in the same partition.