I am a beginner of Apache Spark. I want to filter out all groups whose sum of weight is larger than a constant value in a RDD. The "weight" map is also a RDD. Here is a small-size demo, the groups to be filtered is stored in "groups", the constant value is 12:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)
When the input data is very large, > 10GB for example, I always encounter a "java heap out of memory" error. I doubted if it's caused by "weights.toArray.toMap", because it convert an distributed RDD to an Java object in JVM. So I tried to filter with RDD directly:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)
When I ran result.collect after loading this script into spark shell, I got a "java.lang.NullPointerException" error. Someone told me when a RDD is manipulated in another RDD, there will be a nullpointer exception, and suggest me to put the weight into Redis.
So how can I get the "result" without convert "weight" to Map, or put it into Redis? If there is a solution to filter a RDD based on another map-like RDD without the help of external datastore service? Thanks!