1
votes

I have a key-value pair RDD. The RDD contains some elements with duplicate keys, and I want to split original RDD into two RDDs: One stores elements with unique keys, and another stores the rest elements. For example,

Input RDD (6 elements in total):

<k1,v1>, <k1,v2>, <k1,v3>, <k2,v4>, <k2,v5>, <k3,v6>

Result:

Unique keys RDD (store elements with unique keys; For the multiple elements with the same key, any element is accepted):

<k1,v1>, <k2, v4>, <k3,v6>

Duplicated keys RDD (store the rest elements with duplicated keys):

<k1,v2>, <k1,v3>, <k2,v5>

In the above example, unique RDD has 3 elements, and the duplicated RDD has 3 elements too.

I tried groupByKey() to group elements with the same key together. For each key, there is a sequence of elements. However, the performance of groupByKey() is not good because the data size of element value is very big which causes very large data size of shuffle write.

So I was wondering if there is any better solution. Or is there a way to reduce the amount of data being shuffled when using groupByKey()?

2
How can The total number of elements in UniqueRDD and DuplicatedRDD should be the same? - mrsrinivas
Correct the post. #Input RDD = #UniqueRDD + #DuplicatedRDD - michelle

2 Answers

2
votes

EDIT: given the new information in the edit, I would first create the unique rdd, and than the the duplicate rdd using the unique and the original one:

val inputRdd: RDD[(K,V)] = ...

val uniqueRdd: RDD[(K,V)] = inputRdd.reduceByKey((x,y) => x) //keep just a single value for each key

val duplicateRdd =  inputRdd
       .join(uniqueRdd)
       .filter {case(k, (v1,v2)) => v1 != v2}
       .map {case(k,(v1,v2)) => (k, v1)} //v2 came from unique rdd

there is some room for optimization also.
In the solution above there will be 2 shuffles (reduceByKey and join).
If we repartition the inputRdd by the key from the start, we won't need any additional shuffles using this code should produce much better performance:

val inputRdd2 = inputRdd.partitionBy(new HashPartitioner(partitions=200) )

Original Solution:

you can try the following approach:
first count the number of occurrences of each pair, and then split into the 2 rdds

val inputRdd: RDD[(K,V)] = ...

val countRdd: RDD[((K,V), Int)] = inputRDD
        .map((_, 1)) 
        .reduceByKey(_ + _)
        .cache

val uniqueRdd = countRdd.map(_._1)

val duplicateRdd = countRdd
         .filter(_._2>1)
         .flatMap { case(kv, count) => 
                  (1 to count-1).map(_ => kv)
   }
0
votes

Please use combineByKey resulting in use of combiner on the Map Task and hence reduce shuffling data.

The combiner logic depends on your business logic.

http://bytepadding.com/big-data/spark/groupby-vs-reducebykey/

There are multiple ways to reduce shuffle data. 
 1. Write less from Map task by use of combiner. 
 2. Send Aggregated serialized objects from Map to reduce. 
 3. Use combineInputFormts to enhance efficiency of combiners.