7
votes

I have gone through some videos in Youtube regarding Spark architecture.

Even though Lazy evaluation, Resilience of data creation in case of failures, good functional programming concepts are reasons for success of Resilenace Distributed Datasets, one worrying factor is memory overhead due to multiple transformations resulting into memory overheads due data immutability.

If I understand the concept correctly, Every transformations is creating new data sets and hence the memory requirements will gone by those many times. If I use 10 transformations in my code, 10 sets of data sets will be created and my memory consumption will increase by 10 folds.

e.g.

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

Above example has three transformations : flatMap, map and reduceByKey. Does it implies I need 3X memory of data for X size of data?

Is my understanding correct? Is caching RDD is only solution to address this issue?

Once I start caching, it may spill over to disk due to large size and performance would be impacted due to disk IO operations. In that case, performance of Hadoop and Spark are comparable?

EDIT:

From the answer and comments, I have understood lazy initialization and pipeline process. My assumption of 3 X memory where X is initial RDD size is not accurate.

But is it possible to cache 1 X RDD in memory and update it over the pipleline? How does cache () works?

2
Hi, it's already a time that I've worked with Spark (so I forgot a lot of the details) but you don't need 3x the memory. Since there is the lazy evaluation as you mentioned, there is basically just one operation. Through to internal optimization routines it's basically just "one big" transformation. Ones you start using the disk the performance should go towards the performance of hadoop.Michael
@Michael why would the perf go to MR?Justin Pihony
It's still quicker, but of course once you start using the disk the "tendency is towards Hadoop speed", what basically just means it's getting slower. But of course you are right with your answer, that you anyways have performance benefits through eg. the lazy eval.Michael

2 Answers

12
votes

First off, the lazy execution means that functional composition can occur:

scala> val rdd = sc.makeRDD(List("This is a test", "This is another test", 
                                 "And yet another test"), 1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[70] at makeRDD at <console>:27

scala> val counts = rdd.flatMap(line => {println(line);line.split(" ")}).
     | map(word => {println(word);(word,1)}).
     | reduceByKey((x,y) => {println(s"$x+$y");x+y}).
     | collect
This is a test
This
is
a
test
This is another test
This
1+1
is
1+1
another
test
1+1
And yet another test
And
yet
another
1+1
test
2+1
counts: Array[(String, Int)] = Array((And,1), (is,2), (another,2), (a,1), (This,2), (yet,1), (test,3))

First note that I force the parallelism down to 1 so that we can see how this looks on a single worker. Then I add a println to each of the transformations so that we can see how the workflow moves. You see that it processes the line, then it processes the output of that line, followed by the reduction. So, there are not separate states stored for each transformation as you suggested. Instead, each piece of data is looped through the entire transformation up until a shuffle is needed, as can be seen by the DAG visualization from the UI:

DAG

That is the win from the laziness. As to Spark v Hadoop, there is already a lot out there (just google it), but the gist is that Spark tends to utilize network bandwidth out of the box, giving it a boost right there. Then, there a number of performance improvements gained by laziness, especially if a schema is known and you can utilize the DataFrames API.

So, overall, Spark beats MR hands down in just about every regard.

3
votes

The memory requirements of Spark not 10 times if you have 10 transformations in your Spark job. When you specify the steps of transformations in a job Spark builds a DAG which will allow it to execute all the steps in the jobs. After that it breaks the job down into stages. A stage is a sequence of transformations which Spark can execute on dataset without shuffling.

When an action is triggered on the RDD, Spark evaluates the DAG. It just applies all the transformations in a stage together until it hits the end of the stage, so it is unlikely for the memory pressure to be 10 time unless each transformation leads to a shuffle (in which case it is probably a badly written job).

I would recommend watching this talk and going through the slides.