0
votes

This is my code

import org.apache.spark.SparkContext..
 def main(args: Array[String]): Unit = {

 val conf = new sparkConf().setMaster("local").setAppname("My app")
 val sc = new SparkContext(conf_
 val inputfile = "D:/test.txt"
 val inputData = sc.textFile(inputFile)

 val DupleRawData = inputData.map(_.split("\\<\\>").toList)
 .map(s => (s(8),s(18)))
 .map(s => (s, 1))
 .reduceByKey(_ + _)

 val UserShopCount = DupleRawData.groupBy(s => s._1._1)
 .map(s => (s._1, s._2.toList.sortBy(z => z._2).reverse))

 val ResultSet = UserShopCount.map(s => (s._1, s._2.take(1000).map(z => z._1._2, z._2))))

ResultSet.foreach(println)
//(aaa,List((100,4), (200,4), (300,3), (800,1)))
//(bbb,List((100,6), (400,5), (500,4)))
//(ccc,List((300,7), (400,6), (700,3)))
// here now I reach..
}

and this is the result I'm getting:

(aaa,List((100,4), (200,4), (300,3), (800,1)))
(bbb,List((100,6), (400,5), (500,4)))
(ccc,List((300,7), (400,6), (700,3)))

I want to final result set RDD is
// val ResultSet: org.apache.spark.rdd.RDD[(String, List[(String, Int)])]

(aaa, List(200,4), (800,1)) // because key of bbb and ccc except 100,300 
(bbb, List((500,4))       // because aaa and ccc key except 100,400
(ccc, List((700,3))       // because aaa and bbb key except 300,400 

please give me a solution or advice...sincerely

2

2 Answers

0
votes

I am assuming your ResultSet is an RDD[String, List[(Int, Int)]]

val zeroVal1: (Long, String, (Int, Int)) = (Long.MaxValue, "", (0, 0))

val zeroVal2: List[(String, (Int, Int))] = List()

val yourNeededRdd = ResultSet
  .zipWithIndex()
  .flatMap({
    ((key, list), index) => list.map(t => (t._1, (index, key, t)))
  })
  .aggregateByKey(zeroVal1)(
    (t1, t2) => { if (t1._1 <= t2._1) t1 else t2 },
    (t1, t2) => { if (t1._1 <= t2._1) t1 else t2 }
  )
  .map({ case (t_1, (index, key, t)) => (key, t) })
  .aggregateByKey(zeroVal2)(
    (l, t) => { t :: l },
    (l1, l2) => { l1 ++ l2 }
  )
0
votes

Here is my attempt:

val data: Seq[(String, List[(Int, Int)])] = Seq(
  ("aaa",List((1,4), (2,4), (3,3), (8,1))),
  ("bbb",List((1,6), (4,5), (5,4))),
  ("ccc",List((3,7), (6,6), (7,3)))
)

val uniqKeys = data.flatMap {
  case (_, v) => {
    v.map(_._1)
  }
} groupBy(identity(_)) filter (_._2.size == 1) 


val result = data.map {
  case (pk, v) => val finalValue = v.filter {
    case (k, _) => !uniqKeys.contains(k)
  }
  (pk, finalValue)
}

Output:

result: Seq[(String, List[(Int, Int)])] = List((aaa,List((1,4), (3,3))), (bbb,List((1,6))), (ccc,List((3,7))))