
I'm newbie in spark I would like to understand some basic mechanism on how it works behind the scenes. I have attached the lineage of my RDD and I have the following questions:

  1. Why do I have 8 stages instead of 5? From the book Learning from Spark (Chapter 8 http://bit.ly/1E0Hah7), I could understand that "RDDs that exist at the same level of indentation as their parents will be pipelined [into same physical stage] during physical execution". Since I have 5 parents, I'm expected to have 5 stages. Still the Spark UI stages view, shows 8 stages. Also what represents the (8) represented in the debug string? Is any bug in this function?
  2. At the stage level, what is the execution order among the tasks?

They can be executed all of them in parallel

HadoopRDD[0] || MappedRDD[1] || MapPartitionsRDD[4] || ZippedWithIndexRDD[6]

or they are waiting each task upon the other to complete

HadoopRDD[0]=>completed=>MappedRDD[1]=>completed=>etc ?
  1. Between stages, the order is given by the execution plan, so each stage is waiting till the ones before are completed. Is this a correct assumption?

I look forward for your answers.

Regards, Florin

(8) MappedRDD[21] at map at WAChunkSepvgFilterNewModel.scala:298 []
 |  MappedRDD[20] at map at WAChunkSepvgFilterNewModel.scala:182 []
 |  ShuffledRDD[19] at sortByKey at WAChunkSepvgFilterNewModel.scala:182 []
 +-(8) ShuffledRDD[16] at aggregateByKey at WAChunkSepvgFilterNewModel.scala:182 []
    +-(8) FlatMappedRDD[15] at flatMap at WAChunkSepvgFilterNewModel.scala:174 []
       |  ZippedWithIndexRDD[14] at zipWithIndex at WAChunkSepvgFilterNewModel.scala:174 []
       |  MappedRDD[13] at map at WAChunkSepvgFilterNewModel.scala:272 []
       |  MappedRDD[12] at map at WAChunkSepvgFilterNewModel.scala:161 []
       |  ShuffledRDD[11] at sortByKey at WAChunkSepvgFilterNewModel.scala:161 []
       +-(8) ShuffledRDD[8] at aggregateByKey at WAChunkSepvgFilterNewModel.scala:161 []
          +-(8) FlatMappedRDD[7] at flatMap at WAChunkSepvgFilterNewModel.scala:153 []
             |  ZippedWithIndexRDD[6] at zipWithIndex at WAChunkSepvgFilterNewModel.scala:153 []
             |  MappedRDD[5] at map at WAChunkSepvgFilterNewModel.scala:248 []
             |  MapPartitionsRDD[4] at mapPartitionsWithIndex at WAChunkSepvgFilterNewModel.scala:114 []
             |  test4spark.csv MappedRDD[1] at textFile at WAChunkSepvgFilterNewModel.scala:215 []
             |  test4spark.csv HadoopRDD[0] at textFile at WAChunkSepvgFilterNewModel.scala:215 []
It would be easier to explain if you include the actual code.Marko Bonaci
ZipWithIndex method start a new job and a new stage. Since I have 2 zipWithIndex method call I'll get 2 extra stages. Also, sortByKey method starts a new job with a stage, so I get another extra stage. Therefore I have 8 stages. The answer for the questions 2 and 3 doesn't depend on my code. They should be cover by the spark execution framework at its core. But I could not find any documentation on these subjects.florins

1 Answers


(8) Represents the # of partitions in the rdd. Regarding Point 3,your assumption is correct.Once all the tasks belonging to a stage is completed only then the next stages tasks are started