3
votes

I am trying to create new RDD based on given PairRDD. I have a PairRDD with few keys but each keys have large (about 100k) values. I want to somehow repartition, make each Iterable<v> into RDD[v] so that I can further apply map, reduce, sortBy etc effectively on those values. I am sensing flatMapValues is my friend but want to check with other sparkens. This is for real-time spark app. I have already tried collect() and computing all measures in-memory of app server but trying to improve upon it. This is what I try (psuedo)

class ComputeMetrices{

    transient JavaSparkContext sparkContext;

    /**
     * This method compute 3 measures: 2 percentiles of different values and 1 histogram 
     * @param javaPairRdd
     * @return
     */
    public Map<String, MetricsSummary> computeMetrices(JavaPairRDD<String, InputData> javaPairRdd) {

      JavaPairRDD<String, MetricsSummary> rdd = javaPairRdd.groupByKey(10).mapValues(itr => {

      MetricsSummary ms = new MetricsSummary();

      List<Double> list1 
      List<Double> list2

      itr.foreach{ list1.add(itr._2.height); list2.add(itr._2.weight)}
       //Here I want to convert above lists into RDD 
      JavaRDD<V> javaRdd1 = sparContext.parallelize(list1) //null pointer ; probably at sparkContext
      JavaRDD<V> javaRdd2 = sparContext.parallelize(list2)
      JavaPairRDD1 javaPairRdd1 = javaRdd1.sortBy.zipWithIndex()
      JavaPairRDD2 javaPairRdd2 = javaRdd2.sortBy.zipWithIndex()
      //Above two PairRDD will be used further to find Percentile values for range of (0..100)
      //Not writing percentile algo for sake of brevity
      double[] percentile1 = //computed from javaPairRdd1
      double[] percentile2 = //computed from javaPairRdd2
      ms.percentile1(percentile1)
      ms.percentile2(percentile2)
      //compute histogram
      JavaDoubleRDD dRdd = sparkContext.parallelizeDoubles(list1)
      long[] hist = dRdd.histogram(10)
      ms.histo(hist)
      return ms
      })
      return rdd.collectAsMap
    }
}

I want to create RDD out of that Iterable from groupByKey result so that I can user further spark transformations.

1
Could you please add an example? - Vijay Innamuri
@VijayInnamuri Do you mean other example than what I already posted? My problem is I can't find any way to create RDD from existing RDD or from Iterable during transformation - nir
Please post and example of the input input data structure and an example of the result you want your class to produce. - vvladymyrov
Edited example. I am trying to compute multiple measures from one RDD. and as you see to do that I am trying to create multiple RDD so that I can compute those measure in more distributed fashion rather then on just one node. - nir

1 Answers

0
votes

The reason why sparContext is null is that the code inside your mapValues is executed on a worker - there are no sparContext available on a worker, it is available only on driver.

If I understand your code I can tell that there is no need to create if you want mapValues to produce sorted and indexes pairs.

Please keep in mind that result of that code would look like:

RDD(String, V) ->groupByKey-> RDD(String, List(V)) 
->mapValues-> RDD(String, List(Int,V))

i.e.

key1, List((0,V1), (0,V2)
key1, List((0,V1), (0,V2)

mapValues is applied to every V inside of grouped List independently. So counter will be always 0.

If you want to convert emit multiple RDDs out of single RDD with K, List(V) than flatMapValues will help you. There is still question - how efficient will be streaming operations over new rdd - map and reduce will work for sure, but sortBy would depend on the size of your window.

RDD(K, List(V)) -> flatMapValues(x=>x) -> RDD((K, V1), (K, V2) ... )