0
votes

Suppose I start with partitioning my data on the outset:

i.e:

val rdd: RDD[Int, String] = ...;
val data = rdd.partitionBy(new RangePartitioner(8, rdd)).persist()

and then lets say I did:

val groupedData = data.groupByKey() // what is the partitioner here?

Typically, groupByKey has default partitioning logic of using a HashPartitioner - in this case, will it keep using the parent partitioner, or will it re-partition the data according to the default?

I looked at Partitioner#defaultPartitioner, and that does seem to be the case (that you use the default partitioning logic only if an explicit partitioner hasn't been specified), but I thought I'd do a sanity check.

As an additional question, barring the key-changing transformation like #map, #flatMap etc. For all the key-preserving transformations, do they all hold-onto and propagate the explicitly defined partitioner? What about if I never partition the data, do they propagate the prior level's defaults?

i.e.:

rdd.groupByKey() // hash-partitioner by default
.mapValues(_.head)
.sortByKey //range-partitioner by default, but does it use the hash-partitioner from before?
1

1 Answers

0
votes

In the first case partitioner will be preserved. It is easy to check

for {
  p1 <- groupedData.partitioner
  p2 <- data.partitioner
} yield p1 == p2

// Some(true)

In the second case partitioner is inherent part of the transformation so the HashPartitioner will be discarded:

val grouped = rdd.groupByKey

for {
  p1 <- grouped.partitioner
  p2 <- grouped.sortByKey().partitioner
} yield p1 == p2

// Some(false)