6
votes

I ran the following job in the spark-shell:

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist
d.join(d.reduceByKey(_ + _)).collect

The Spark UI shows three stages. Stage 4 and 5 correspond to the computation of d, and stage 6 corresponds to the computation of the collect action. Since d is persisted, I would expect only two stages. However stage 5 is present not connected to any other stages.

Spark UI DAG

So tried running the same computation without using persist, and the DAG looks like identically, except without the green dots indicating the RDD has been persisted.

Spark UI DAG without persist

I would expect the output of stage 11 to be connect to the input of stage 12, but it is not.

Looking at the stage descriptions, the stages seem to indicate that d is being persisted, because stage 5 has input, but I am still confused as to why stage 5 even exists.

Spark UI stages

Spark UI stages without persist

2

2 Answers

1
votes
  1. Input RDD is cached and cached part is not recomputed.

    This can be validated with a simple test:

    import org.apache.spark.SparkContext
    
    def f(sc: SparkContext) = {
      val counter = sc.longAccumulator("counter")
      val rdd = sc.parallelize(0 until 100).map(i => {
        counter.add(1L)
        (i%10, i)
      }).persist
      rdd.join(rdd.reduceByKey(_ + _)).foreach(_ => ())
      counter.value
    }
    
    assert(f(spark.sparkContext) == 100)
    
  2. Caching doesn't remove stages from DAG.

    If data is cached corresponding stages can be marked as skipped but are still part of the DAG. Lineage can be truncated using checkpoints but it is not the same thing and it doesn't remove stages from visualization.

  3. Input stages contain more than cached computations.

    Spark stages group together operations which can be chained without performing shuffle.

    While part of the input stage is cached it doesn't cover all the operations required to prepare shuffle files. This is why you don't see skipped tasks.

  4. The rest (detachment) is just a limitation of the graph visualization.

  5. If you repartition data first:

    import org.apache.spark.HashPartitioner
    
    val d = sc.parallelize(0 until 1000000)
      .map(i => (i%100000, i))
      .partitionBy(new HashPartitioner(20))
    
    d.join(d.reduceByKey(_ + _)).collect
    

    you'll get DAG you're most likely looking for:

    enter image description here

0
votes

Adding to user6910411's detailed answer, RDD is not persisted in memory until the first action runs and it computes the whole DAG, due to lazy evaluation of RDDs. So when you run collect() first time, RDD "d" gets persisted in memory for the first time, but nothing gets read from the memory. If you run collect() second time, the cached RDD is read.

Also, if you do a toDebugString on the final RDD, it shows the below output:

    scala> d.join(d.reduceByKey(_ + _)).toDebugString
res5: String =
(4) MapPartitionsRDD[19] at join at <console>:27 []
 |  MapPartitionsRDD[18] at join at <console>:27 []
 |  CoGroupedRDD[17] at join at <console>:27 []
 +-(4) MapPartitionsRDD[15] at map at <console>:24 []
 |  |  ParallelCollectionRDD[14] at parallelize at <console>:24 []
 |  ShuffledRDD[16] at reduceByKey at <console>:27 []
 +-(4) MapPartitionsRDD[15] at map at <console>:24 []
    |  ParallelCollectionRDD[14] at parallelize at <console>:24 []

A rough graphical representation of above can be shown as:RDD Stages