I have a dataframe containing following 3 columns: 1. ID 2. timestamp 3. IP_Address
The data spans from 2019-07-01 to 2019-09-20. I am trying to aggregate counts of IP_address over the last 60 days partitioned by ID for all the rows between the 20day period of 2019-09-01 to 2019-09-20.
I have tried using the following window function and it works just fine:
days = lambda i: i*86400
w = Window.partitionBy('id')\
.orderBy(unix_timestamp(col('timestamp')))\
.rangeBetween(start=-days(60), end=Window.currentRow)
df = df.withColumn("ip_counts", count(df.ip_address).over(w))
However, the problem with that is that it calculates these aggregations even for the period I don't need the computation for: 2019-07-01 to 2019-08-31. I could easily filter out the results for the selected period retrospectively after calculations but I don't want unnecessary computations as I am dealing with ~3-10 Million rows per day.
If I filter the dataframe like this:
dates = ('2019-09-01', '2019-09-20')
date_from, date_to = [F.to_date(F.lit(s)).cast("timestamp") for s in dates]
w = Window.partitionBy('id')\
.orderBy(unix_timestamp(col('timestamp')))\
.rangeBetween(start=-days(60), end=Window.currentRow)
df = df.where((df.timestamp >= date_from) & (df.timestamp <= date_to))\
.withColumn("ip_counts", count(df.ip_address).over(w))
in that case, the IDs between these days are unable to access the data for those IDs from preceding 60 days and therefore, the counts are incorrect.
What can I do to compute aggregations only for the rows falling between 2019-09-01 to 2019-09-20 while at the same time making sure that windows have access to preceding 60 days of data for each of those aggregation. Thank you so much for your help.