Here's situation: I have a constantly growing collection of data, which I want to process using RDD across the Hadoop cluster.
Here is the short example:
val elementA = (1, Seq(2, 3)) val elementB = (2, Seq(1, 3)) val elementC = (3, Seq(1, 2)) val testRDD = sc.parallelize(Seq(elementA, elementB, elementC)). map(x => (x._1, x._2)).setName("testRDD").cache() val elementD = (4, Seq(1, 3)) val elementD1 = (1, Seq(4)) val elementD2 = (3, Seq(4)) val testAdd = sc.parallelize(Seq(elementD, elementD1, elementD2)). map(x => (x._1, x._2)).setName("testAdd") val testResult = testRDD.cogroup(testAdd).mapValues(x => (x._1 ++ x._2).flatten) The result will be like this (order of elements can vary):
(1, List(2, 3, 4)) (2, List(1, 3)) (3, List(1, 2, 4)) (4, List(1, 3)) Here's my goals:
- I want to
.cache()my RDD in cluster memory. - I want to have the ability to add new elements to the existing RDD.
Here's what I've figured out:
- Each partition in RDD caches separately and entirely (for example, I had the collection with 100 elements and 4 partitions, I called
.cache().collect()andcache().first()and got 4 cached partitions in first case and 1 in second case). - The result of
testRDD.cogroup(testAdd)is new RDD, that could be cached again, and if we'll try to usevar testRDDand calltestRDD = testRDD.cogroup(testAdd), we'll lose the link to the cached data. - I know, that RDD is most suiltable for batch applications, and I have this here: the
Seq()for each new element will be computed from the properties of another elements.
Is there any way to modify current RDD without removing all of it's elements from cache?
I though about making a kind of temporary storage and merging temporary storage with current storage after reaching some limit on temporary storage...