0
votes

I need to get the all the columns along with the count.In Scala RDD.

Col1 col2  col3 col4
us    A     Q1   10
us    A      Q3   10
us    A      Q2   20
us    B      Q4   10
us    B      Q5   20
uk    A      Q1   10
uk    A      Q3   10
uk    A      Q2   20
uk    B      Q4   10
uk    B      Q5   20

I want result like:

Col1    col2       col3     col4     count
us         A           Q1       10          3
us         A           Q3      10          3
us         A           Q3      10          3
us         B           Q4      10          2
us         B           Q5      20          2
uk         A           Q1       10          3
uk         A           Q3      10          3
uk         A           Q3      10          3
uk         B           Q4      10          2
uk         B           Q5      20          2

This is something like group by of col1, col2 and gets counts. Now I need along with col13,col4.

I am trying the SCALA RDD like:

val Top_RDD_1 = RDD.groupBy(f=> ( f._1,f._2 )).mapValues(_.toList)

This produces

RDD[((String, String), List[(String, String, String, Double, Double, Double)])]

Nothing but (col1,col2), List (col1,col2,col3,col14) result like (us,A) List((us,a,Q1,10),(us,a,Q3,10),(us,a,Q2,20)).,,,

How can I take the list count and access the list value.

Please help me spark SCALA RDD code.

Thanks Balaji.

2

2 Answers

1
votes

I can't see a way to do this in one "scan" of the RDD - you'll have to calculate the counts using reduceByKey and then join to the original RDD. To do that efficiently (without causing re-calculation of the input) you'd better cache/persist the input before the join:

val keyed: RDD[((String, String), (String, String, String, Int))] = input
  .keyBy { case (c1, c2, _, _) => (c1, c2) }
  .cache()

val counts: RDD[((String, String), Int)] = keyed.mapValues(_ => 1).reduceByKey(_ + _)

val result = keyed.join(counts).values.map {
  case ((c1, c2, c3, c4), count) => (c1, c2, c3, c4, count)
} 
0
votes

Here is the python code:

sales = [["US","A","Q1", 10], ["US","A","Q2", 20], ["US","B","Q3", 10], ["UK","A","Q1", 10], ["UK","A","Q2", 20], ["UK","B","Q3", 10]]  -- Sample RDD Data

def func(data):
    ldata = list(data)                  # converting iterator class to list
    size = len(ldata)                   # count(*) of the list
    return [i + [size] for i in ldata]  # adding count(*) to the list

sales_count = sales.groupBy( lambda w: (w[0], w[1])).mapValues(func)
# Result: [(('US', 'A'), [['US', 'A', 'Q1', 10, 2], ['US', 'A', 'Q2', 20, 2]]), (('US', 'B'), [['US', 'B', 'Q3', 10, 1]]), (('UK', 'A'), [['UK', 'A', 'Q1', 10, 2], ['UK', 'A', 'Q2', 20, 2]]), (('UK', 'B'), [['UK', 'B', 'Q3', 10, 1]])]

finalResult = sales_count.flatMap(lambda res: res[1])
# Result:  [['US', 'A', 'Q1', 10, 2], ['US', 'A', 'Q2', 20, 2], ['US', 'B', 'Q3', 10, 1], ['UK', 'A', 'Q1', 10, 2], ['UK', 'A', 'Q2', 20, 2], ['UK', 'B', 'Q3', 10, 1]]

# Both the above operations can be combined to one statement
finalResult = sales.groupBy( lambda w: (w[0], w[1])).mapValues(func).flatMap(lambda res: res[1])

Note: A custom function is really helpful like I did. You can easily convert the same code into scala code