I am trying to create new RDD based on given PairRDD. I have a PairRDD with few keys but each keys have large (about 100k) values. I want to somehow repartition, make each Iterable<v> into RDD[v] so that I can further apply map, reduce, sortBy etc effectively on those values. I am sensing flatMapValues is my friend but want to check with other sparkens. This is for real-time spark app. I have already tried collect() and computing all measures in-memory of app server but trying to improve upon it.
This is what I try (psuedo)
class ComputeMetrices{
transient JavaSparkContext sparkContext;
/**
* This method compute 3 measures: 2 percentiles of different values and 1 histogram
* @param javaPairRdd
* @return
*/
public Map<String, MetricsSummary> computeMetrices(JavaPairRDD<String, InputData> javaPairRdd) {
JavaPairRDD<String, MetricsSummary> rdd = javaPairRdd.groupByKey(10).mapValues(itr => {
MetricsSummary ms = new MetricsSummary();
List<Double> list1
List<Double> list2
itr.foreach{ list1.add(itr._2.height); list2.add(itr._2.weight)}
//Here I want to convert above lists into RDD
JavaRDD<V> javaRdd1 = sparContext.parallelize(list1) //null pointer ; probably at sparkContext
JavaRDD<V> javaRdd2 = sparContext.parallelize(list2)
JavaPairRDD1 javaPairRdd1 = javaRdd1.sortBy.zipWithIndex()
JavaPairRDD2 javaPairRdd2 = javaRdd2.sortBy.zipWithIndex()
//Above two PairRDD will be used further to find Percentile values for range of (0..100)
//Not writing percentile algo for sake of brevity
double[] percentile1 = //computed from javaPairRdd1
double[] percentile2 = //computed from javaPairRdd2
ms.percentile1(percentile1)
ms.percentile2(percentile2)
//compute histogram
JavaDoubleRDD dRdd = sparkContext.parallelizeDoubles(list1)
long[] hist = dRdd.histogram(10)
ms.histo(hist)
return ms
})
return rdd.collectAsMap
}
}
I want to create RDD out of that Iterable from groupByKey result so that I can user further spark transformations.