0
votes

I have an rdd with n partitions and I would like to split this rdd into k rdds in such a way that

rdd = rdd_1.union(rdd_2).union(rdd_3)...union(rdd_k)

So for example if n=10 and k=2 I would like to end up with 2 rdds where rdd1 is composed of 5 partitions and rdd2 is composed of the other 5 partitions.

What is the most efficient way to do this in Spark?

1
Proving more info on what you are trying to achieve might get you a more helpful answer. E.g. if you want to balance you partition sizes, you can use repartition. I cannot see how having an RDD per partition would server any purpose unless you also had you own partitioner. Also note, that there are many functions that can make use of the partition index so you can just return for invalid partitions. Last but not least, using groupBy could also be applicable if your partitions have a logical split.Ioannis Deligiannis

1 Answers

1
votes

You can try something like this:

val rdd: RDD[T] = ???
val k: Integer = ???
val n = rdd.partitions.size

val rdds = (0 until n) // Create Seq of partitions numbers
  .grouped(n / k)  // group it into fixed sized buckets
  .map(idxs => (idxs.head, idxs.last)) // Take the first and the last idx
  .map {
    case(min, max) => rdd.mapPartitionsWithIndex(
      // If partition in [min, max] range keep its iterator
      // otherwise return empty-one
      (i, iter) => if (i >= min & i <= max) iter else Iterator()
    )
  }

If input RDD has complex dependencies you should cache it before applying this.