1
votes

I have a Spark RDD that looks like this:

[(1, ...),
(1, ...),
(2, ...),
(3, ...)]

And I am trying to remove the records that have duplicate keys, in this case, I want to exclude all the records that have key '1'. And the ultimate output I want should look like

[(2, ...),
(3, ...)]

What I have tried so far, it worked but my gut says there should be a better solution:

>> a = sc.parallelize([(1,[1,1]), (1,[1,1]), (2,[1,1]), (3,[1,1])])
>> print a.groupByKey() \
    .filter(lambda x: len(x[1])==1 ) \
    .map(lambda x: (x[0], list(x[1])[0] )).collect()
[(2, [1, 1]), (3, [1, 1])]

Can anyone help me on this?

1
Could you either accept the answer or explain why it doesn't work so it can be improved? Thanks in advance :)zero323
Also, if you could take a look at this stackoverflow.com/q/33157978/1560062 If you don't find answer useful I'll just delete.zero323

1 Answers

1
votes

Two other options:

  1. subtractByKey - this requires shuffling so total cost can be similar to groupByKey. Optionally you can partition input RDD and with preservesPartitioning set to True:

    from operator import add
    
    counts = (a.keys()
        .map(lambda x: (x, 1))
        .reduceByKey(add))
    
    duplicates = (counts
        .filter(lambda x:  x[1] >  1)
        .map(lambda x: (x[0], None)))
    
    a.subtractByKey(duplicates)
    
  2. Broadcast variable:

    • positive filter - if you expect a large number of duplicates

      non_duplicated = sc.broadcast(set(
          counts.filter(lambda x: x[1] == 1).keys().collect()
      ))
      
      a.filter(lambda x: x[0] in non_duplicated.value)
      
    • negative filter - if expected a low number of duplicates

      duplicated = sc.broadcast(set(
          counts.filter(lambda x: x[1] > 1).keys().collect()
      ))
      
      a.filter(lambda x: x[0] not in duplicated.value)