4
votes

I am using spark stream to read data from kafka cluster. I want to sort a DStream pair and get the Top N alone. So far I have sorted using

val result = ds.reduceByKeyAndWindow((x: Double, y: Double) => x + y, 
                   Seconds(windowInterval), Seconds(batchInterval))
result.transform(rdd => rdd.sortBy(_._2, false))
result.print

My Questions are

  • How to get only the top N elements from the dstream ?
  • The transform operation is applied rdd by rdd . So will the result be sorted across elements in all rdds ? If not how to achieve it ?
1
Were you able to solve this ? - Shubham Kankaria
since reduceByKeyAndWindow results in single rdd . The rdd.sortBy().take(N) will work. - Knight71
result.transform(rdd => rdd.sortBy(_._2, false)) , i think adding take inside the transform method gives error saying, found: Array , required :rdd[?]. Can you please tell , how u solved it? - Alok

1 Answers

3
votes

You can use transform method in the DStream object then sort the input RDD and take n elements of it in a list, then filter the original RDD to be contained in this list.

Note: Both RDD and DStream are immutable, So any transformation would return a new RDD or DStream but won't change in the original RDD or DStream.

val n = 10
val topN = result.transform(rdd =>{
   val list = rdd.sortBy(_._2, false).take(n)
   rdd.filter(list.contains)
})
topN.print