1
votes

Here's situation: I have a constantly growing collection of data, which I want to process using RDD across the Hadoop cluster.

Here is the short example:

val elementA = (1, Seq(2, 3))
val elementB = (2, Seq(1, 3))
val elementC = (3, Seq(1, 2))

val testRDD = sc.parallelize(Seq(elementA, elementB, elementC)).
    map(x => (x._1, x._2)).setName("testRDD").cache()

val elementD = (4, Seq(1, 3))
val elementD1 = (1, Seq(4))
val elementD2 = (3, Seq(4))

val testAdd = sc.parallelize(Seq(elementD, elementD1, elementD2)).
    map(x => (x._1, x._2)).setName("testAdd")

val testResult = testRDD.cogroup(testAdd).mapValues(x => (x._1 ++ x._2).flatten)

The result will be like this (order of elements can vary):

(1, List(2, 3, 4))
(2, List(1, 3))
(3, List(1, 2, 4))
(4, List(1, 3))

Here's my goals:

  1. I want to .cache() my RDD in cluster memory.
  2. I want to have the ability to add new elements to the existing RDD.

Here's what I've figured out:

  1. Each partition in RDD caches separately and entirely (for example, I had the collection with 100 elements and 4 partitions, I called .cache().collect() and cache().first() and got 4 cached partitions in first case and 1 in second case).
  2. The result of testRDD.cogroup(testAdd) is new RDD, that could be cached again, and if we'll try to use var testRDD and call testRDD = testRDD.cogroup(testAdd), we'll lose the link to the cached data.
  3. I know, that RDD is most suiltable for batch applications, and I have this here: the Seq() for each new element will be computed from the properties of another elements.

Is there any way to modify current RDD without removing all of it's elements from cache?

I though about making a kind of temporary storage and merging temporary storage with current storage after reaching some limit on temporary storage...

1

1 Answers

1
votes

RDDs are immutable so you can't add new elements to them. You can however create a new RDD by unioning your original RDD with the new elements, similarly to what you did with your testResult RDD.

If you want to use the same variable for the new RDD with the updates you could use a var rather that a val for that RDD. e.g.

var testRDD = sc.parallelize(...) val testAdd = sc.parallelize(...) testRDD = testRDD.union(testAdd) testRDD.cache()

This will create a lineage joining the two original RDDs. This could cause issues if you call union on testRDD too many times. To fix that you can call checkpoint on testRDD after it has been union-ed so many times, say every 10 updates. You could also consider calling repartion on testRDD when checkpointing.

All elements added to testRDD should stay in the cache using this technique.