0
votes

I am new to Spark and stumble upon the following (probably simple) problem.

I have a RDD with key-value elements, each value being a (string, number) pair. For instance the key-value pair is ('A', ('02', 43)).

I want to reduce this RDD by keeping elements (key and the whole value) with maximum numbers when they share the same key.

reduceByKey() seems relevant and i went with this MWE.

sc= spark.sparkContext
rdd = sc.parallelize([
 ('A', ('02', 43)),
 ('A', ('02', 36)),
 ('B', ('02', 306)),
 ('C', ('10', 185))])
rdd.reduceByKey(lambda a,b : max(a[1],b[1])).collect()

which produces

[('C', ('10', 185)), ('A', 43), ('B', ('02', 306))]

My problem here is that i would like to get:

[('C', ('10', 185)), ('A', ('02', 43)), ('B', ('02', 306))]

i.e, i don't see how to return ('A',('02',43)) and not simply ('A',43).

2

2 Answers

0
votes

I found myself a solution to this simple problem. Define a function instead of using an inline function for reduceByKey(). This is:

def max_compound(a,b):
 if (max(a[1],b[1])==a[1]):
   return a
 else: 
   return b

and call:

rdd.reduceByKey(max_compound).collect()
0
votes

The following code is in Scala, hope you can convert the same logic into pyspark

val rdd = sparkSession.sparkContext.parallelize(Array(('A', (2, 43)), ('A', (2, 36)), ('B', (2, 306)), ('C', (10, 185))))

val rdd2 = rdd.reduceByKey((a, b) => (Math.max(a._1, b._1), Math.max(a._2, b._2)))

rdd2.collect().foreach(println)

output:

(B,(2,306))
(A,(2,43))
(C,(10,185))