2
votes

I have a dataframe containing following 3 columns: 1. ID 2. timestamp 3. IP_Address

The data spans from 2019-07-01 to 2019-09-20. I am trying to aggregate counts of IP_address over the last 60 days partitioned by ID for all the rows between the 20day period of 2019-09-01 to 2019-09-20.

I have tried using the following window function and it works just fine:

days = lambda i: i*86400
w =  Window.partitionBy('id')\
           .orderBy(unix_timestamp(col('timestamp')))\
           .rangeBetween(start=-days(60), end=Window.currentRow)

df = df.withColumn("ip_counts", count(df.ip_address).over(w))

However, the problem with that is that it calculates these aggregations even for the period I don't need the computation for: 2019-07-01 to 2019-08-31. I could easily filter out the results for the selected period retrospectively after calculations but I don't want unnecessary computations as I am dealing with ~3-10 Million rows per day.

If I filter the dataframe like this:

dates = ('2019-09-01', '2019-09-20')
date_from, date_to = [F.to_date(F.lit(s)).cast("timestamp") for s in dates]

w =  Window.partitionBy('id')\
           .orderBy(unix_timestamp(col('timestamp')))\
           .rangeBetween(start=-days(60), end=Window.currentRow)

df = df.where((df.timestamp >= date_from) & (df.timestamp <= date_to))\
       .withColumn("ip_counts", count(df.ip_address).over(w))

in that case, the IDs between these days are unable to access the data for those IDs from preceding 60 days and therefore, the counts are incorrect.

What can I do to compute aggregations only for the rows falling between 2019-09-01 to 2019-09-20 while at the same time making sure that windows have access to preceding 60 days of data for each of those aggregation. Thank you so much for your help.

1

1 Answers

0
votes

I would first make a new data frame keeping all the data from the last 60 days, then follow your first method computing aggregations only for the rows falling between 2019-09-01 to 2019-09-20.