I want to calculate cumulative count of values in data frame column over past1 hour using moving window. I can get the expected output with pyspark (non streaming) window function using rangeBetween, but I want to use real time data processing so trying with spark structured streaming such that if any new record/transaction come into system, I get desired output.
the data is like
time,col
2019-04-27 01:00:00,A
2019-04-27 00:01:00,A
2019-04-27 00:05:00,B
2019-04-27 01:01:00,A
2019-04-27 00:08:00,B
2019-04-27 00:03:00,A
2019-04-27 03:03:00,A
using pyspark (non streaming)
from pyspark.sql.window import Window
df = sqlContext.read.format("csv") \
.options(header='true', inferschema='false',delimiter=',') \
.load(r'/datalocation')
df=df.withColumn("numddate",unix_timestamp(df.time, "yyyy-MM-dd HH:mm:ss"))
w1=Window.partitionBy("col").orderBy("numddate").rangeBetween(-3600, -1)
df=df.withColumn("B_cumulative_count", count("col").over(w1))
+-------------------+---+----------+------------------+
| time|col| numddate|B_cumulative_count|
+-------------------+---+----------+------------------+
|2019-04-27 00:05:00| B|1556348700| 0|
|2019-04-27 00:08:00| B|1556348880| 1|
|2019-04-27 00:01:00| A|1556348460| 0|
|2019-04-27 00:03:00| A|1556348580| 1|
|2019-04-27 01:00:00| A|1556352000| 2|
|2019-04-27 01:01:00| A|1556352060| 3|
|2019-04-27 03:03:00| A|1556359380| 0|
+-------------------+---+----------+------------------+
(This is what I required, so getting it by above code)
Structured streaming, this is what i am trying
userSchema = StructType([
StructField("time", TimestampType()),
StructField("col", StringType())
])
lines2 = spark \
.readStream \
.format('csv')\
.schema(userSchema)\
.csv("/datalocation")
windowedCounts = lines2.groupBy(
window(lines2.time, "1 hour"),
lines2.col
).count()
windowedCounts.writeStream.format("memory").outputMode("complete").queryName("test2").option("truncate","false").start()
spark.table("test2").show(truncate=False)
streaming output:
+------------------------------------------+---+-----+
|window |col|count|
+------------------------------------------+---+-----+
|[2019-04-27 03:00:00, 2019-04-27 04:00:00]|A |1 |
|[2019-04-27 00:00:00, 2019-04-27 01:00:00]|A |2 |
|[2019-04-27 01:00:00, 2019-04-27 02:00:00]|A |2 |
|[2019-04-27 00:00:00, 2019-04-27 01:00:00]|B |2 |
+------------------------------------------+---+-----+
How to replicate same using spark structured streaming?