0
votes

Iam having trouble understanding the lineage if an RDD. For instance

lets say we have this lineage:

hadoopRDD(location) <-depends- filteredRDD(f:A->Boolean) <-depends- mappedRDD(f:A->B)

If we persist the first RDD and after some actions we unpersist it. Will this affect others depended RDD? If yes, how can er avoid that?

My point is if we unpersist a parent RDD will this action remove partitions from the children RDDs?

1

1 Answers

0
votes

Lets walk through an example. This will create an RDD with a Seq of Ints in one partition. The reason for one partition is simply to keep ordering for the rest of the example.

scala> val seq = Seq(1,2,3,4,5)
seq: Seq[Int] = List(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(seq, 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:23

Now lets create two new RDDs which are mapped versions of the original:

scala> val firstMappedRDD = rdd.map { case i => println(s"firstMappedRDD  calc for $i"); i * 2 }
firstMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:25

scala> firstMappedRDD.toDebugString
res25: String = 
(1) MapPartitionsRDD[12] at map at <console>:25 []
 |  ParallelCollectionRDD[11] at parallelize at <console>:23 []

scala> val secondMappedRDD = firstMappedRDD.map { case i => println(s"secondMappedRDD calc for $i"); i * 2 }
secondMappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:27

scala> secondMappedRDD.toDebugString
res26: String = 
(1) MapPartitionsRDD[13] at map at <console>:27 []
 |  MapPartitionsRDD[12] at map at <console>:25 []
 |  ParallelCollectionRDD[11] at parallelize at <console>:23 []

We can see the lineages using toDebugString. I added printlns to each map step to make it clear when the map is called. Let's collect each RDD to see what happens:

scala> firstMappedRDD.collect()
firstMappedRDD  calc for 1
firstMappedRDD  calc for 2
firstMappedRDD  calc for 3
firstMappedRDD  calc for 4
firstMappedRDD  calc for 5
res27: Array[Int] = Array(2, 4, 6, 8, 10)

scala> secondMappedRDD.collect()
firstMappedRDD  calc for 1
secondMappedRDD calc for 2
firstMappedRDD  calc for 2
secondMappedRDD calc for 4
firstMappedRDD  calc for 3
secondMappedRDD calc for 6
firstMappedRDD  calc for 4
secondMappedRDD calc for 8
firstMappedRDD  calc for 5
secondMappedRDD calc for 10
res28: Array[Int] = Array(4, 8, 12, 16, 20)

As you would expect, the map for the first step is called once again when we call secondMappedRDD.collect(). So now let's cache the first mapped RDD.

scala> firstMappedRDD.cache()
res29: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25

scala> secondMappedRDD.toDebugString
res31: String = 
(1) MapPartitionsRDD[13] at map at <console>:27 []
 |  MapPartitionsRDD[12] at map at <console>:25 []
 |  ParallelCollectionRDD[11] at parallelize at <console>:23 []

scala> firstMappedRDD.count()
firstMappedRDD  calc for 1
firstMappedRDD  calc for 2
firstMappedRDD  calc for 3
firstMappedRDD  calc for 4
firstMappedRDD  calc for 5
res32: Long = 5

scala> secondMappedRDD.toDebugString
res33: String = 
(1) MapPartitionsRDD[13] at map at <console>:27 []
 |  MapPartitionsRDD[12] at map at <console>:25 []
 |      CachedPartitions: 1; MemorySize: 120.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[11] at parallelize at <console>:23 []

The lineage of the second mapped RDD has the cached result of the first in it's lineage, after the result of the first map is in the cache. Now let's call collect:

scala> secondMappedRDD.collect
secondMappedRDD calc for 2
secondMappedRDD calc for 4
secondMappedRDD calc for 6
secondMappedRDD calc for 8
secondMappedRDD calc for 10
res34: Array[Int] = Array(4, 8, 12, 16, 20)

And now let's unpersist and call collect again.

scala> firstMappedRDD.unpersist()
res36: firstMappedRDD.type = MapPartitionsRDD[12] at map at <console>:25

scala> secondMappedRDD.toDebugString
res37: String = 
(1) MapPartitionsRDD[13] at map at <console>:27 []
 |  MapPartitionsRDD[12] at map at <console>:25 []
 |  ParallelCollectionRDD[11] at parallelize at <console>:23 []

scala> secondMappedRDD.collect
firstMappedRDD  calc for 1
secondMappedRDD calc for 2
firstMappedRDD  calc for 2
secondMappedRDD calc for 4
firstMappedRDD  calc for 3
secondMappedRDD calc for 6
firstMappedRDD  calc for 4
secondMappedRDD calc for 8
firstMappedRDD  calc for 5
secondMappedRDD calc for 10
res38: Array[Int] = Array(4, 8, 12, 16, 20)

So when we collect the result of the second mapped RDD after the first has been unpersisted, the map of the first gets called again.

If the source had been HDFS, or any other storage, the data would have been retrieved from the source again.