0
votes

Say I have an RDD[(String, Int)] like the following example:

(A, 0)
(B, 0)
(C, 1)
(D, 0)
(E, 2)
(F, 1)
(G, 1)
(H, 3)
(I, 2)
(J, 0)
(K, 3)

I want to efficiently print the total amount of records that contain 0, 1, 2, etc. Since the RDD contains millions of entries I would like to do this as efficient as possible.

The output of this example would return something like:

Number of records containing 0 = 4
Number of records containing 1 = 3
Number of records containing 2 = 2
Number of records containing 3 = 2

Currently I try implementing this by performing a filter on the big RDD and then count() for 0, 1, 2,.. separately. I am using Scala.

Is there a more efficient way to do this? I already cache the RDD, but still my program runs out of memory (I have set the driver memory to 5G).

EDIT: As suggested by Tzach I now use countByKey:

rdd.map(_.swap).countByKey()

Can I refine this by changing the string value to a tuple (where the 2nd element is either "m" or "f"), and then obtain the counts per key per unique value of the 2nd element of this tuple?

For example:

(A,m), 0)
(B,f), 0)
(C,m), 1)
(D,m), 0)
(E,f), 2)
(F,f), 1)
(G,m), 1)
(H,m), 3)
(I,f), 2)
(J,f), 0)
(K,m), 3)

Would result in

((0,m), 2)
((0,f), 2)
((1,m), 2)
((1,f), 1)
((2,m), 0)
((2,f), 2)
((3,m), 2)
((3,f), 0)

Thanks in advance!

1

1 Answers

2
votes

You can use the convenient countByKey just for that - just swap the places in the input beforehand to make the numeric value the key:

val rdd = sc.parallelize(Seq(
  ("A", 0), ("B", 0), ("C", 1), ("D", 0), ("E", 2),
  ("F", 1), ("G", 1), ("H", 3), ("I", 2), ("J", 0), ("K", 3)
))

rdd.map(_.swap).countByKey().foreach(println)
// (0,4)
// (1,3)
// (3,2)
// (2,2)

EDIT: countByKey does exactly what it sounds like - so whatever key you want to use, just transform your RDD to have that as the left-side part of the tuple, e.g.:

rdd.map { case ((a, b), i) => ((i, b), a) }.countByKey()

or:

rdd.keyBy { case ((_, b), i) => (i, b) }.countByKey()