1
votes

I have a HadoopRDD from which I'm creating a first RDD with a simple Map function then a second RDD from the first RDD with another simple Map function. Something like :

HadoopRDD -> RDD1 -> RDD2.

My question is whether Spak will iterate over the HadoopRDD record by record to generate RDD1 then it will iterate over RDD1 record by record to generate RDD2 or does it ietrate over HadoopRDD and then generate RDD1 and then RDD2 in one go.

2

2 Answers

3
votes

Short answer: rdd.map(f).map(g) will be executed in one pass.

tl;dr

Spark splits a job into stages. A stage applied to a partition of data is a task.

In a stage, Spark will try to pipeline as many operations as possible. "Possible" is determined by the need to rearrange data: an operation that requires a shuffle will typically break the pipeline and create a new stage.

In practical terms:

Given `rdd.map(...).map(..).filter(...).sort(...).map(...)`

will result in two stages:

.map(...).map(..).filter(...)
.sort(...).map(...)

This can be retrieved from an rdd using rdd.toDebugString The same job example above will produce this output:

val mapped = rdd.map(identity).map(identity).filter(_>0).sortBy(x=>x).map(identity)

scala> mapped.toDebugString
res0: String = 
(6) MappedRDD[9] at map at <console>:14 []
 |  MappedRDD[8] at sortBy at <console>:14 []
 |  ShuffledRDD[7] at sortBy at <console>:14 []
 +-(8) MappedRDD[4] at sortBy at <console>:14 []
    |  FilteredRDD[3] at filter at <console>:14 []
    |  MappedRDD[2] at map at <console>:14 []
    |  MappedRDD[1] at map at <console>:14 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:12 []

Now, coming to the key point of your question: pipelining is very efficient. The complete pipeline will be applied to each element of each partition once. This means that rdd.map(f).map(g) will perform as fast as rdd.map(f andThen g) (with some neglectable overhead)

0
votes

Apache Spark will iterate over the HadoopRDD record by record in no specific order (data will be split and sent to the workers) and "apply" the first transformation to compute RDD1. After that, the second transformation is applied to each element of RDD1 to get RDD2, again in no specific order, and so on for successive transformations. You can notice it from the map method signature:

// Return a new RDD by applying a function to all elements of this RDD.
def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]

Apache Spark follows a DAG (Directed Acyclic Graph) execution engine. It won't actually trigger any computation until a value is required, so you have to distinguish between transformations and actions.

EDIT:

In terms of performance, I am not completely aware of the underlying implementation of Spark, but I understand there shouldn't be a significant performance loss other than adding extra (unnecessary) tasks in the related stage. From my experience, you don't normally use transformations of the same "nature" successively (in this case two successive map's). You should be more concerned of performance when shuffling operations take place, because you are moving data around and this has a clear impact on your job performance. Here you can find a common issue regarding that.