1
votes

I am using spark 1.2 with scala and have a pair RDD with (String, String). A sample record looks like:

<Key,  value>
id_1,  val_1_1; val_1_2
id_2,  val_2_1; val_2_2
id_3,  val_3_1; val_3_2
id_1,  val_4_1; val_4_2

I just want to remove all the records with duplicate key, so in the above example, fourth record will be removed because id_1 is a duplicate key.

Pls help.

Thanks.

2
Where there are duplicate keys, how will you decide which value to keep?mattinbits
Its just the first value that I need.user2200660
The problem is that when Spark does a reduceByKey, as suggested in the answer below, you have no way to know which value will be picked. There's no guarantee that Spark maintains the ordering of the rows. Is there something about the value (such as the fact it is _1_1) that you can use to differentiate?mattinbits
@mattinbits, ziipWithIndex first, then in the reduce, just keep the one with the lowest index, then map afterwards to remove the index. Viola! No, wait ,that's a large violin. Volia!The Archetypal Paul

2 Answers

11
votes

You can use reduceByKey:

val rdd: RDD[(K, V)] = // ...
val res: RDD[(K, V)] = rdd.reduceByKey((v1, v2) => v1)
1
votes

If it necessary to select always the first entry for a given key, then, combining @JeanLogeart answer with the comment from @Paul,

import org.apache.spark.{SparkContext, SparkConf}

val data = List(
  ("id_1", "val_1_1; val_1_2"),
  ("id_2",  "val_2_1; val_2_2"),
  ("id_3",  "val_3_1; val_3_2"),
  ("id_1",  "val_4_1; val_4_2") )

val conf = new SparkConf().setMaster("local").setAppName("App")
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(data)
val resultRDD = dataRDD.zipWithIndex.map{
  case ((key, value), index) => (key, (value, index))
}.reduceByKey((v1,v2) => if(v1._2 < v2._2) v1 else v2).mapValues(_._1)
resultRDD.collect().foreach(v => println(v))
sc.stop()