1
votes

I am newbie on Stack overflow and to Spark.Basically doing RDD transformation.

My input data :

278222631,2763985,10.02.12,01.01.53,Whatsup,NA,Email,Halter,wagen,28.06.12,313657794,VW,er,i,B,0,23.11.11,234
298106482,2780663,22.02.12,22.02.12,Whatsup,NA,WWW,Halter,wagen,26.06.12,284788860,VW,er,i,B,0,02.06.04,123

My RDD format

val dateCov: RDD[(Long, Long, String, String, String, String, String, String, String, String, Long, String, String, String, String, String, String, Long)]

doing some reduceBykey transformations map([(k,k),(v)] on col (1,17) as key and col(18) as Value. And applying some functions on reduceByKey

example:

val reducedSortedRDD = dateCov.map(r => { ((r._1, r._11) -> (r._18)) })
      .reduceByKey((x, y) => ((math.min(x, y)))) // find minimum diff
      .map(r => (r._1._1, r._1._2, r._2))
      .sortBy(_._1, true)
  1. My question - Is it possible after reduceByKey function to get all the others columns i.e my reducedSortedRDD return type should be reducedSortedRDD : RDD[(Long, Long, String, String, String, String, String, String, String, String, Long, String, String, String, String, String, String, Long)]

and not reducedSortedRDD: RDD[(Long, Long, Long)] as in this case.

  1. I am doing it right? I just want basically to have a whole initial RDD instead of a subset of RDD after reduceByKey transformation

I am using spark 1.4

1
Which version of Spark are you using?Glennie Helles Sindholt

1 Answers

3
votes

As far as I know, you need to bring all columns along in your reduceByKey function (keep in mind the overhead of shuffling extra data) or alternatively you may be able to join the reducedSortedRDD with your original data.

To bring all columns along, you would do something like this:

val reducedSortedRDD = dateCov
  .map(r => ((r._1, r._11),(r._18, r._2, r._3, ..., r._17)))
  .reduceByKey((value1,value2) => if (value1._1 < value2._1) value1 else value2)
  .map{case(key, value) => (key._1, key._2, value._2, value._3, ..., value._17, value._1)}
  .sortBy(_._1, true)

To do join, it would look something like this:

val keyValuedDateCov = dateCov
  .map(r => ((r._1, r._11, r._18), (r._1, r._2,r._3, ...., r._17)))

val reducedRDD = dateCov
  .map(r => ((r._1, r._11), r._18))
  .reduceByKey((x, y) => math.min(x, y)) // find minimum diff
  .map{case(key, value) => ((key._1, key._2, value), AnyRef)}

val reducedSortedRDD = reducedRDD
  .join(keyValuedDateCov)
  .map{case(key, (_, original)) => (key._1, key._2, original._1, original._2, original._3, ..., original._17, key._3)}
  .sortBy(_._1, true)

The join version has a weakness in that if multiple rows in the original data has the exact same values in columns 1, 17 and 18, then the end result will also contain multiple rows with those values and thus not properly reduced. If data is guaranteed not to have multiple rows with the same values in these columns, there should be no problem.