2
votes

I have a list of 80 rdds that I want to process and then ultimately join. The "process" part consists of doing a map and a reduce by key for each rdd. Then I'm "joining" them by doing an union. Here's a sketch of my code:

rdds0.foreach(_.persist()) //rdds0 are persisted

//trigger a map and a shuffle for each rdd
vals rdds = rdds0.map(rdd => rdd.map(f1).reduceByKey(f2))

//action on the union of the rdds
sparkContext.union(rdds).collect()

However, I have issues with the DAG that is generated. Indeed the DAG that is generated by spark is like so:

  • 80 stages, one for each "map" of each RDD

  • 1 final stage for the union, that starts with 80 reduceByKey in parallel

I have an issue with the part in bold. AFAIK, this means that for the last task, Spark will schedule in parallel 80 reducebykey, where each of them is taking lots of memory. It seems more efficient to be able to do the reduceByKey() for each rdd individually as soon as the map stage is done for this RDD. Instead, no reduceByKey can be performed before all the map stages are done, and then they are all scheduled at the same time.

Is there a way to force Spark somehow to execute the redueByKey() operations ASAP instead of waiting for all the map tasks? I thought this was a matter of union() creating a PartitionerAwareUnionRDD instead of a UnionRDD() but it seems that both RDD types generate the same DAG.

1

1 Answers

1
votes

reduceByKey is a wide transformation - this means it has:

  • "map-side" component - part of the operation that happens before shuffle - contained in the first stage in your DAG.
  • "reduce-side" component - part of the operation which happens after the shuffle - contained in the second stage of your DAG.

The results of the "reduce-side" component are piped directly to union. There is really nothing to optimize in this case.