26
votes

I need to generate a full list of row_numbers for a data table with many columns.

In SQL, this would look like this:

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;

Now, let's say in Spark I have an RDD of the form (K, V), where V=(col1, col2, col3), so my entries are like

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.

I want to order these using commands like sortBy(), sortWith(), sortByKey(), zipWithIndex, etc. and have a new RDD with the correct row_number

(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.

(I don't care about the parentheses, so the form can also be (K, (col1,col2,col3,rownum)) instead)

How do I do this?

Here's my first attempt:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???

Also note that the function sortBy cannot be applied directly to an RDD, but one must run collect() first, and then the output isn't an RDD, either, but an array

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)

Here's a little more progress, but still not partitioned:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
4
I'm also looking to answer this question. Hive added analytic functions (including row_number()) in 0.11, and Spark 1.1 supports HiveQL / Hive 0.12. So it seems that sqlContext.hql("select row_number() over(partition by ... should work, but I'm getting an error.dnlbrky

4 Answers

27
votes

The row_number() over (partition by ... order by ...) functionality was added to Spark 1.4. This answer uses PySpark/DataFrames.

Create a test DataFrame:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)),
     Row(k="key1", v=(1,4,7)),
     Row(k="key1", v=(2,2,3)),
     Row(k="key2", v=(5,5,5)),
     Row(k="key2", v=(5,5,9)),
     Row(k="key2", v=(7,5,5))
    )
).toDF()

Add the partitioned row number:

from pyspark.sql.window import Window

(testDF
 .select("k", "v",
         F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,2,3]|     1|
|key1|[1,4,7]|     2|
|key1|[2,2,3]|     3|
|key2|[5,5,5]|     1|
|key2|[5,5,9]|     2|
|key2|[7,5,5]|     3|
+----+-------+------+
5
votes

This is an interesting problem you're bringing up. I will answer it in Python but I'm sure you will be able to translate seamlessly to Scala.

Here is how I would tackle it:

1- Simplify your data:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))

temp2 is now a "real" key-value pair. It looks like that:

[
((3, 4), (5, 5, 5)),  
((3, 4), (5, 5, 9)),   
((3, 4), (7, 5, 5)),   
((1, 2), (1, 2, 3)),  
((1, 2), (1, 4, 7)),   
((1, 2), (2, 2, 3))

]

2- Then, use the group-by function to reproduce the effect of the PARTITION BY:

temp3 = temp2.groupByKey()

temp3 is now a RDD with 2 rows:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
 ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]

3- Now, you need to apply a rank function for each value of the RDD. In python, I would use the simple sorted function (the enumerate will create your row_number column):

 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)

Note that to implement your particular order, you would need to feed the right "key" argument (in python, I would just create a lambda function like those:

lambda tuple : (tuple[0],-tuple[1],tuple[2])

At the end (without the key argument function, it looks like that):

[
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2))

]

Hope that helps!

Good luck.

1
votes
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))

test: Seq[(String, (Int, Int, Int))] = List((key1,(1,2,3)), (key1,(4,5,6)), (key2,(7,8,9)), (key2,(0,1,2)))

test.foreach(println)

(key1,(1,2,3))

(key1,(4,5,6))

(key2,(7,8,9))

(key2,(0,1,2))

val rdd = sc.parallelize(test, 2)

rdd: org.apache.spark.rdd.RDD[(String, (Int, Int, Int))] = ParallelCollectionRDD[41] at parallelize at :26

val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))

rdd1: org.apache.spark.rdd.RDD[(String, Array[((Int, Int, Int), Int)])] = MapPartitionsRDD[44] at map at :25

val rdd2 = rdd1.flatMap{ 
  elem =>
   val key = elem._1
   elem._2.map(row => (key, row._1, row._2))
 }

rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int, Int), Int)] = MapPartitionsRDD[45] at flatMap at :25

rdd2.collect.foreach(println)

(key1,(1,2,3),0)

(key1,(4,5,6),1)

(key2,(0,1,2),0)

(key2,(7,8,9),1)

0
votes

From spark sql, Read the data files...
val df = spark.read.json("s3://s3bukcet/key/activity/year=2018/month=12/date=15/*");

The above file has fields user_id, pageviews and clicks

Generate the activity Id (row_number) partitioned by user_id and order by clicks

val output = df.withColumn("activity_id", functions.row_number().over(Window.partitionBy("user_id").orderBy("clicks")).cast(DataTypes.IntegerType));