1
votes

I want to calculate running sum from last one hour for each transaction using Spark-Scala. I have following dataframe with three fields and want to calculate fourth field as given below:

Customer     TimeStamp        Tr Last_1Hr_RunningSum
Cust-1  6/1/2015 6:51:55      1        1
Cust-1  6/1/2015 6:58:34      3        4
Cust-1  6/1/2015 7:20:46      3        7
Cust-1  6/1/2015 7:40:45      4       11
Cust-1  6/1/2015 7:55:34      5       15
Cust-1  6/1/2015 8:20:34      0       12
Cust-1  6/1/2015 8:34:34      3       12
Cust-1  6/1/2015 9:35:34      7        7
Cust-1  6/1/2015 9:45:34      3       10
Cust-2  6/1/2015 16:26:34     2        2
Cust-2  6/1/2015 16:35:34     1        3
Cust-2  6/1/2015 17:39:34     3        3
Cust-2  6/1/2015 17:43:34     5        8
Cust-3  6/1/2015 17:17:34     6        6
Cust-3  6/1/2015 17:21:34     4       10
Cust-3  6/1/2015 17:45:34     2       12
Cust-3  6/1/2015 17:56:34     3       15
Cust-3  6/1/2015 18:21:34     4       13
Cust-3  6/1/2015 19:24:34     1        1

I want to calculate "Last_1Hr_RunningSum" as new field which look back for one hour from each transaction by customer id and take some of "Tr"(Transaction filed).

  1. For example :Cust-1 at 6/1/2015 8:20:34 will look back till 6/1/2015 7:20:46 and take sum of (0+5+4+3) = 12.
  2. Same way for each row I want to look back for one hour and take sum of all Transaction during that one hour.

I tried running sqlContext.sql with nested query but its giving me error. Also Window function and Row Number over partition is not supported by Spark-Scala SQLContext.

How can I get the sum of last one hour from "Tr" using column 'TimeStamp' with Spark-Scala only.

Thanks in advance.

1
You should show the query and fix the indentation and format of the sample data, and the expected dataAlberto Bonsanto
@AlbertoBonsanto I have fix the indentation and format of the sample data. Input table will be first three fields and expected output table will be with additional field "Last_1Hr_RunningSum".Nikhil
"I tried running sqlContext.sql with nested query but its giving me error" Are we intended to guess what query you tried and what error you got?The Archetypal Paul
@The Archetypal Paul, nested query in spark sql is not supported.Nikhil

1 Answers

2
votes

I tried running sqlContext.sql with nested query but its giving me error

Did you try using Join?

df.registerTempTable("input")

val result = sqlContext.sql("""
        SELECT
           FIRST(a.Customer) AS Customer,
           FIRST(a.Timestamp) AS Timestamp,
           FIRST(a.Tr) AS Tr,
           SUM(b.Tr) AS Last_1Hr_RunningSum
        FROM input a
        JOIN input b ON
          a.Customer = b.Customer
          AND b.Timestamp BETWEEN (a.Timestamp - 3600000) AND a.Timestamp
        GROUP BY a.Customer, a.Timestamp
        ORDER BY a.Customer, a.Timestamp
        """)

result.show()

Which prints the expected result:

+--------+-------------+---+-------------------+
|Customer|    Timestamp| Tr|Last_1Hr_RunningSum|
+--------+-------------+---+-------------------+
|  Cust-1|1420519915000|  1|                1.0|
|  Cust-1|1420520314000|  3|                4.0|
|  Cust-1|1420521646000|  3|                7.0|
|  Cust-1|1420522845000|  4|               11.0|
|  Cust-1|1420523734000|  5|               15.0|
|  Cust-1|1420525234000|  0|               12.0|
|  Cust-1|1420526074000|  3|               12.0|
|  Cust-1|1420529734000|  7|                7.0|
|  Cust-1|1420530334000|  3|               10.0|
|  Cust-2|1420554394000|  2|                2.0|
|  Cust-2|1420554934000|  1|                3.0|
|  Cust-2|1420558774000|  3|                3.0|
|  Cust-2|1420559014000|  5|                8.0|
|  Cust-3|1420557454000|  6|                6.0|
|  Cust-3|1420557694000|  4|               10.0|
|  Cust-3|1420559134000|  2|               12.0|
|  Cust-3|1420559794000|  3|               15.0|
|  Cust-3|1420561294000|  4|               13.0|
|  Cust-3|1420565074000|  1|                1.0|
+--------+-------------+---+-------------------+

(This solution assumes the time is given in milliseconds)