I read several data frames from kafka topics using Pyspark Structured Streaming 2.4.4. I would like to add some new columns to that data frames that mainly are based on window calculations over past N data points (for instance: Moving average over last 20 data points), and as a new data point is delivered, the corresponding value of MA_20 should be instantly calculated.
Data may look like this: Timestamp | VIX
2020-01-22 10:20:32 | 13.05
2020-01-22 10:25:31 | 14.35
2020-01-23 09:00:20 | 14.12
It is worth to mention that data will be received from Monday to Friday over 8 hour period a day. Thus Moving average calculated on Monday morning should include data from Friday!
I tried different approaches but, still I am not able to achieve what I want.
windows = df_vix \
.withWatermark("Timestamp", "100 minutes") \
.groupBy(F.window("Timestamp", "100 minute", "5 minute")) \
aggregatedDF = windows.agg(F.avg("VIX"))
Preceding code calculated MA but it will consider data from Friday as late, so they will be excluded. better than last 100 minutes should be last 20 points (with 5 minute intervals).
I thought that I can use rowsBetween or rangeBetween, but in streaming data frames window cannot be applie over non-timestamp columns (F.col('Timestamp').cast('long'))
w = Window.orderBy(F.col('Timestamp').cast('long')).rowsBetween(-600, 0)
df = df_vix.withColumn('MA_20', F.avg('VIX').over(w)
)
But on the other hand there is no possibility to specify interval within rowsBetween(), using rowsBetween(- minutes(20), 0) throws: minutes are not defined (there is no such a function in sql.functions)
I found the other way, but it doesn't work for streaming data frames either. Don't know why 'Non-time-based windows are not supported on streaming DataFrames' error is raised (df_vix.Timestamp is of timestamp type)
df.createOrReplaceTempView("df_vix")
df_vix.createOrReplaceTempView("df_vix")
aggregatedDF = spark.sql(
"""SELECT *, mean(VIX) OVER (
ORDER BY CAST(df_vix.Timestamp AS timestamp)
RANGE BETWEEN INTERVAL 100 MINUTES PRECEDING AND CURRENT ROW
) AS mean FROM df_vix""")
I have no idea what else could I use to calculate simple Moving Average. It looks like it is impossible to achive that in Pyspark... maybe better solution will be to transform each time new data is comming entire Spark data frame to Pandas and calculate everything in Pandas (or append new rows to pandas and calculate MA) ???
I thought that creating new features as new data is comming is the main purpose of Structured Streaming, but as it turned out Pyspark is not suited to this, I am considering giving up Pyspark an move to Pandas ...
EDIT
The following doesn't work as well, altough df_vix.Timestamp of type: 'timestamp', but it throws 'Non-time-based windows are not supported on streaming DataFrames' error anyway.
w = Window.orderBy(df_vix.Timestamp).rowsBetween(-20, -1)
aggregatedDF = df_vix.withColumn("MA", F.avg("VIX").over(w))