Several sources describe RDDs as ephemeral by default (e.g., this s/o answer) -- meaning that they do not stay in memory unless we call cache() or persist() on them.
So let's say our program involves an ephemeral (not explicitly cached by the user) RDD that is used in a few operations that cause the RDD to materialize. My question is: does Spark discard the materialized ephemeral RDD immediately -- or is it possible that the RDD stays in memory for other operations, even if we never asked for it to be cached?
Also, if an ephemeral RDD stays in memory, is it always only because some LRU policy has not yet kicked it out -- or can it also be because of scheduling optimizations?
I've tried to figure that out with code like that below -- run with Jupyter notebook with python 3.5 and spark 1.6.0, on a 4-core machine -- but I would appreciate an answer by someone who knows for sure.
import pyspark
sc = pyspark.SparkContext()
N = 1000000 # size of dataset
THRESHOLD = 100 # some constant
def f():
""" do not chache """
rdd = sc.parallelize(range(N))
for i in range(10):
print(rdd.filter(lambda x: x > i * THRESHOLD).count())
def g():
""" cache """
rdd = sc.parallelize(range(N)).cache()
for i in range(10):
print(rdd.filter(lambda x: x > i * THRESHOLD).count())
For the two functions above, f() does not ask the rdd to persist - but g() does, at the beginning. When I time the two functions, foo() and boo(), I get very comparable performance for the two, as if the cache() call has made no difference. (In fact, the one that uses caching is slower).
%%timeit
f()
> 1 loops, best of 3: 2.19 s per loop
%%timeit
g()
> 1 loops, best of 3: 2.7 s per loop
Actually, even modifying f() to call unpersist() on the RDD does not change things.
def ff():
""" modified f() with explicit call to unpersist() """
rdd = sc.parallelize(range(N))
for i in range(10):
rdd.unpersist()
print(rdd.filter(lambda x: x > i * THRESHOLD).count())
%%timeit
ff()
> 1 loops, best of 3: 2.25 s per loop
The documentation for unpersist() states that it "mark[s] the RDD as non-persistent, and remove[s] all blocks for it from memory and disk." Is this really so, though - or does Spark ignore the call to unpersist when it knows it's going to use the RDD down the road?