Here's situation: I have a constantly growing collection of data, which I want to process using RDD across the Hadoop cluster.
Here is the short example:
val elementA = (1, Seq(2, 3))
val elementB = (2, Seq(1, 3))
val elementC = (3, Seq(1, 2))
val testRDD = sc.parallelize(Seq(elementA, elementB, elementC)).
map(x => (x._1, x._2)).setName("testRDD").cache()
val elementD = (4, Seq(1, 3))
val elementD1 = (1, Seq(4))
val elementD2 = (3, Seq(4))
val testAdd = sc.parallelize(Seq(elementD, elementD1, elementD2)).
map(x => (x._1, x._2)).setName("testAdd")
val testResult = testRDD.cogroup(testAdd).mapValues(x => (x._1 ++ x._2).flatten)
The result will be like this (order of elements can vary):
(1, List(2, 3, 4))
(2, List(1, 3))
(3, List(1, 2, 4))
(4, List(1, 3))
Here's my goals:
- I want to
.cache()
my RDD in cluster memory. - I want to have the ability to add new elements to the existing RDD.
Here's what I've figured out:
- Each partition in RDD caches separately and entirely (for example, I had the collection with 100 elements and 4 partitions, I called
.cache().collect()
andcache().first()
and got 4 cached partitions in first case and 1 in second case). - The result of
testRDD.cogroup(testAdd)
is new RDD, that could be cached again, and if we'll try to usevar testRDD
and calltestRDD = testRDD.cogroup(testAdd)
, we'll lose the link to the cached data. - I know, that RDD is most suiltable for batch applications, and I have this here: the
Seq()
for each new element will be computed from the properties of another elements.
Is there any way to modify current RDD without removing all of it's elements from cache?
I though about making a kind of temporary storage and merging temporary storage with current storage after reaching some limit on temporary storage...