1
votes

I have 2 paired RDDs that I joined them together using the same key and I now I want to sort the result using one of the values. The new joined RDD type is : RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]

where the first section is the paired RDD key and the iterable part is the values from the two RDD I joined. I want now to order them by the Time field of the second RDD. I tried to use sortBy function but I got errors.

Any ideas?

Thanks

4
Improve your question to get a quick and good answer. - Kumar
Show your code, and the errors. - The Archetypal Paul

4 Answers

0
votes

Spark pair RDDs have a mapValues method. I think it will help you.

    def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
    Pass each value in the key-value pair RDD through a map function 
without changing the keys; this also retains the original RDD's partitioning.

Spark Documentation has more details.

0
votes

You're right that you can use sortBy function:

val yourRdd: RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])] = ...(your cogroup operation here)

val result = yourRdd.sortBy({
  case ((str, i), iter) if iter.nonEmpty => iter.head._2._
  }, true)

iter.head has type of ((String, DateTime, Int,Int), (String, DateTime, String, String));

iter.head._2 has type of (String, DateTime, String, String) and

iter.head._2._2 is indeed has type of DateTime.

And maybe you should provide implicit ordering object for Datetime like this. By the way, may the iterator be emtpy? Then you should add this case to sortBy function. And if there are many items in this iterator which one to choose for sorting?

0
votes

If the RDD's Iterable needs to be sorted:

val rdd: RDD[((String, Int), 
             Iterable[((String, DateTime, Int,Int), 
                       (String, DateTime, String, String))])] = ???

val dateOrdering = new Ordering[org.joda.time.DateTime]{ 
    override def compare(a: org.joda.time.DateTime,
                         b: org.joda.time.DateTime) = 
        if (a.isBefore(b)) -1 else 1
}

rdd.mapValues(v => v.toArray
                    .sortBy(x => x._2._2)(dateOrdering))
0
votes

Using python:

sortedRDD = unsortedRDD.sortBy(lambda x:x[1][1], False)

This will sort by descending order