0
votes

I have a pyspark dataframe with name h2_df and with columns "parsed_date" (dtype: date) and "id" (dtype: bigint) as shown below:

+-------+-----------+
|     id|parsed_date|
+-------+-----------+
|1471783| 2017-12-18|
|1471885| 2017-12-18|
|1472928| 2017-12-19|
|1476917| 2017-12-21|
|1477469| 2017-12-22|
|1478190| 2017-12-22|
|1478570| 2017-12-22|
|1481415| 2017-12-25|
|1472592| 2017-12-19|
|1474023| 2017-12-20|
+-------+-----------+

I want to create a function where I pass a date and inside the function I want to count the id (from the dataframe h2_df created outside the function) for each date that lies between date ranges. range 1 is (day, day+t) and range 2 is (day+t, day+(2*t) and t =5.

I am new to pyspark so the code below is ofcourse vague and does not work:

def hypo_2(day):
    t = td(days=5)
    start_date_before = day 
    end_date_before = day+t
    
    start_date_after = day+t
    end_date_after = day+(2*t)
    
    cond_1 = (h2_df["parsed_date"] > start_date_before) & (h2_df["parsed_date"] < end_date_before)
    cond_2 = (h2_df["parsed_date"] > start_date_after) & (h2_df["parsed_date"] < end_date_after)
    
    df_1 = h2_df.withColumn("count_before", when(cond_1, h2_df.groupBy("parsed_date").agg(count("id"))))
    df_2 = h2_df.withColumn("count_after", when(cond_2, h2_df.groupBy("parsed_date").agg(count("id"))))
    

I want a function where i can pass any date and then it gives me count of each id with respect to the date but the date should lie in the range only. so every time i call the function, it takes the date -> creates 2 ranges of the date -> creates 2 dataframes each with count of id for every date (and each range) -> returns 2 dataframes that has count of each id in that range.

for example: on calling hypo_2(2017,12,18) the function should return df_1 and df_2. the expected output of df_1 as shown as below:

+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1471885| 2017-12-18|            |
|1472928| 2017-12-19|           1|
|1476917| 2017-12-21|           1|
|1477469| 2017-12-22|           3|
|1478190| 2017-12-22|            |
|1478570| 2017-12-22|            |
+-------+-----------+------------+

Please help.

1
Why is count_before 2 in the first row? Shouldn't it be 6 as all the rows are within 5 days? - mck
Hello @SameekshaSohal, this is the exact duplicate of the same question you already asked here and it was closed for lack of clarity. Please take few moments to look at How to make good reproducible Apache Spark examples and edit your question to make it more understandable for others. - blackbishop
@mck thank you for looking into it. the count_before is 2 in first row because there are 2 ids that we can see on 2017-12-18, 1 id on 2017-12-19 and so on. and this is for range 1 i.e (2017-12-18, 2017-12-23). i don't want the total count of id for the entire range. I just want count of id for each Date that is in the range. - Samiksha
@blackbishop thank you for sharing the reference. I am new to pyspark and stack overflow as well. I am trying to refine the query to my best. I am really hoping for some help. I hope this time the problem statement is comprehensive. thanks :) - Samiksha
@SameekshaSohal then why are the last two rows from the original dataframe not included? - mck

1 Answers

1
votes

You can use a filter to choose the time interval of interest, and add a column of the count for each parsed_date:

from pyspark.sql import functions as F, Window

def hypo_2(df, day, t):
    """
    Example usage: df_list = hypo_2(df, '2017-12-18', 5)
    Returns a list of 2 dataframes.
    """
    df1 = (df.filter(f"parsed_date between '{day}' and '{day}' + interval {t} days")
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date between '{day}' + interval {t} days and '{day}' + interval {t*2} days")
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]