1
votes

I have a spark java program where a groupByKey with a mapValues step is done and it returns a PairRDD with value as an Iterable of all the input rdd values. I have read that replacing reduceByKey in the place of groupByKey with mapValues will give a performance gain, but i don't know how to apply reduceByKey to my problem here.

Specifically i have the an input pair RDD which has value with type Tuple5. After the groupByKey and mapValues transformations, i need to get a Key-Value pair RDD where the value needs to be an Iterable of the input values.

JavaPairRDD<Long,Tuple5<...>> inputRDD;
...
...
...
JavaPairRDD<Long, Iterable<Tuple5<...>>> groupedRDD = inputRDD
    .groupByKey()
    .mapValues(
            new Function<Iterable<Tuple5<...>>,Iterable<Tuple5<...>>>() {

                @Override
                public Iterable<Tuple5<...>> call(
                        Iterable<Tuple5<...>> v1)
                        throws Exception {

                    /*
                    Some steps here..                               
                    */

                    return mappedValue;
                }
            });

Is there a way by which i could get the above transformation using reduceByKey?

1
What are Some steps here? You'll need a logic to reduce it with. - philantrovert
In the mapValues function i am actually sorting each value based on a key within Tuple5. I thought it wasn't relevant here, that's why i didn't include them. - Vishnu P N
I have read that replacing reduceByKey in the place of groupByKey with mapValues will give a performance gain - you've read wrong. - zero323

1 Answers

0
votes

I've been using Scala on Spark, so this isn't going to be the exact answer you might prefer. The main difference in coding between groupByKey/mapValues and reduceByKey can be seen using a trivial example adapted from this article:

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithGroup = wordPairsRDD.
  groupByKey.
  mapValues(_.sum)
wordCountsWithGroup.collect
res1: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

val wordCountsWithReduce = wordPairsRDD.
  reduceByKey(_ + _)
wordCountsWithReduce.collect
res2: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

In this example, where x => x.sum (i.e. _.sum) is used in mapValues, it'll be (acc, x) => acc + x (i.e. _ + _) in reduceByKey. The function signatures are vastly different. In mapValues, you're processing a collection of the grouped values, whereas in reduceByKey you're performing a reduction.