I've got a spark program that essentially does this:
def foo(a: RDD[...], b: RDD[...]) = {
val c = a.map(...)
c.persist(StorageLevel.MEMORY_ONLY_SER)
var current = b
for (_ <- 1 to 10) {
val next = some_other_rdd_ops(c, current)
next.persist(StorageLevel.MEMORY_ONLY)
current.unpersist()
current = next
}
current.saveAsTextFile(...)
}
The strange behavior that I'm seeing is that spark stages corresponding to val c = a.map(...)
are happening 10 times. I would have expected that to happen only once because of the immediate caching on the next line, but that's not the case. When I look in the "storage" tab of the running job, very few of the partitions of c are cached.
Also, 10 copies of that stage immediately show as "active". 10 copies of the stage corresponding to val next = some_other_rdd_ops(c, current)
show up as pending, and they roughly alternate execution.
Am I misunderstanding how to get Spark to cache RDDs?
Edit: here is a gist containing a program to reproduce this: https://gist.github.com/jfkelley/f407c7750a086cdb059c. It expects as input the edge list of a graph (with edge weights). For example:
a b 1000.0
a c 1000.0
b c 1000.0
d e 1000.0
d f 1000.0
e f 1000.0
g h 1000.0
h i 1000.0
g i 1000.0
d g 400.0
Lines 31-42 of the gist correspond to the simplified version above. I get 10 stages corresponding to line 31 when I would only expect 1.
c
. I'm not sure that's the case though. – Daniel Daraboscurrent.unpersist()
statement you have. Are you sure that c never becomes current? – mariosc.join(current.map(...)).aggregateByKey(...).mapValues(...)
. No persist/unpersist, collect, saveToTextFile, etc. – Joe K