2
votes

I thought rangeBetween(start, end) looks into values of the range(cur_value - start, cur_value + end). https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/expressions/WindowSpec.html

But, I saw an example where they used descending orderBy() on timestamp, and then used (unboundedPreceeding, 0) with rangeBetween. Which led me to explore the following example:

dd = spark.createDataFrame(
    [(1, "a"), (3, "a"), (3, "a"), (1, "b"), (2, "b"), (3, "b")],
    ['id', 'category']
)
dd.show()

# output
+---+--------+
| id|category|
+---+--------+
|  1|       a|
|  3|       a|
|  3|       a|
|  1|       b|
|  2|       b|
|  3|       b|
+---+--------+

It seems to include preceding row whose value is higher by 1.

byCategoryOrderedById = Window.partitionBy('category')\
                              .orderBy(desc('id'))\
                              .rangeBetween(-1, Window.currentRow)
dd.withColumn("sum", Fsum('id').over(byCategoryOrderedById)).show()

# output
+---+--------+---+
| id|category|sum|
+---+--------+---+
|  3|       b|  3|
|  2|       b|  5|
|  1|       b|  3|
|  3|       a|  6|
|  3|       a|  6|
|  1|       a|  1|
+---+--------+---+

And with start set to -2, it includes value greater by 2 but in preceding rows.

byCategoryOrderedById = Window.partitionBy('category')\
                        .orderBy(desc('id'))\
                        .rangeBetween(-2,Window.currentRow)
dd.withColumn("sum", Fsum('id').over(byCategoryOrderedById)).show()

# output
+---+--------+---+
| id|category|sum|
+---+--------+---+
|  3|       b|  3|
|  2|       b|  5|
|  1|       b|  6|
|  3|       a|  6|
|  3|       a|  6|
|  1|       a|  7|
+---+--------+---+

So, what is the exact behavior of rangeBetween with desc orderBy?

1

1 Answers

2
votes

It's not well documented but when using range (or value-based) frames the ascending and descending order affects the determination of the values that are included in the frame.

Let's take the example you provided:

RANGE BETWEEN 1 PRECEDING AND CURRENT ROW

Depending on the order by direction, 1 PRECEDING means:

  • current_row_value - 1 if ASC
  • current_row_value + 1 if DESC

Consider the row with value 1 in partition b.

  • With the descending order, the frame includes :

    (current_value and all preceding values where x = current_value + 1) = (1, 2)

  • With the ascending order, the frame includes:

    (current_value and all preceding values where x = current_value - 1) = (1)

PS: using rangeBetween(-1, Window.currentRow) with desc ordering is just equivalent to rangeBetween(Window.currentRow, 1) with asc ordering.