1
votes

I am trying calculate the activity value that is not originated from extra creditation.

Input:

+------+--------+------+
|period|activity|credit|
+------+--------+------+
|     1|       5|     0|
|     2|       0|     3|
|     3|       4|     0|
|     4|       0|     3|
|     5|       1|     0|
|     6|       1|     0|
|     7|       5|     0|
|     8|       0|     1|
|     9|       0|     1|
|    10|       5|     0|
+------+--------+------+

Output:

rdd = sc.parallelize([(5,0,5),(0,3,0),(4,0,1),(0,3,0),(1,0,0),(1,0,0),(5,0,4),(0,1,0),(0,1,0),(5,0,3)])
df = rdd.toDF(["activity","credit","realActivity"])

+------+--------+------+------------+
|period|activity|credit|realActivity|
+------+--------+------+------------+
|     1|       5|     0|           5|
|     2|       0|     3|           0|
|     3|       4|     0|           1|
|     4|       0|     3|           0|
|     5|       1|     0|           0|
|     6|       1|     0|           0|
|     7|       5|     0|           4|
|     8|       0|     1|           0|
|     9|       0|     1|           0|
|    10|       5|     0|           3|
+------+--------+------+------------+

I tried to create a credit balance column that adds and deducts based on the row type, but I could not restart it conditionally (every time it goes below zero) based on itself. It looks like a recursive problem that i am not sure how to transform into pyspark friendly. Obviously, I can't do the following, self referencing the previous value..

w = Window.orderBy("period")

df = df.withColumn("realActivity", lag("realActivity",1,0).over(w) - lag("credit", 1, 0).over(w) - lag("activity",1,0).over(w) )

Update: As it was pointed out, it is not possible with window calculation. Therefore I would like to do something like the snippet below to calculate creditBalance that would let me calculate the realActivity.

df['creditBalance']=0     
for i in range(1, len(df)):
    if (df.loc[i-1, 'creditBalance']) > 0:
        df.loc[i, 'creditBalance'] = df.loc[i-1, 'creditBalance'] + df.loc[i, 'credit'] - df.loc[i, 'activity']
    elif df.loc[i, 'creditamount'] > 0:
        df.loc[i, 'creditBalance'] = df.loc[i, 'credit'] - df.loc[i, 'activity']

Now, my only question is: how can I apply this "local" function to each group in a spark dataframe?

  • write dataframe to files by group and process locally?
  • custom map and collect the rows for the local execution?
  • collapse rows to a single row by group and process that ?
  • anything else?
1
Could you please further explain how realActivity is calculated? For example, how are the values for period 1 and 2 computed? - pansen
Sure. Real activity meant to represent the activity units that are not 'originated' from credit. So first row, activity 5, nothing was credited therefore it is 5. Period 3, activity 4 but 3 was credited before so it is only 1. Period 10, activity 5 but previously there were 2 credits (period 8,period10) so real activity is 3 - ponthu
I guess this is not possible without a window function which keeps some interal state of the current credit. The difficult part is that the credit may span over multiple rows, Otherwise, you could have used lag and cummulative sum. If your data is not too large, you could give it a shot in pandas. - pansen
Yes, the problem is if another credit event happens before "exhausting" the previous amount. We are talking about 500M observations (in total) and 25 observations on average by group (user). I have been thinking of collapsing the activities on group level and interate through that within the row. Can you think of any better way? Thanks, Andras - ponthu
@pansen, updated the question with my local snippet. Could you please advise what approach I should take next? - ponthu

1 Answers

0
votes

@pansen, I've solved the issue with the following code. It may be useful in case you are trying to solve a similar problem.

def creditUsage(rows):
    '''
    Input:
    timestamp, activity, credit
    ['1;5;0', '2;0;3', '3;4;0', '4;0;3', '5;1;0', '6;1;0', '7;5;0', '8;0;1', '9;0;1', '10;5;0']

    Output:
    [timestamp; creditUsage]
    '''
    timestamps = [int(r.split(";")[0]) for r in rows]
    rows = [r for _,r in sorted(zip(timestamps,rows))]

    print(rows)
    timestamp, trActivity, credit = zip(*[(int(ts), float(act), float(rbonus)) for r in rows for [ts, act, rbonus] in [r.split(";")]])
    creditBalance,creditUsage = [0.0] * len(credit), [0.0] * len(credit)

    for i in range(0, len(trActivity)):
        creditBalance[i] = creditBalance[i-1]+credit[i]
        """ if bonusBalance greater than activity then actitivity is the usage, if not, than bonusBalance """
        creditUsage[i] =  creditBalance[i] if creditBalance[i] - trActivity[i] <0 else trActivity[i]
        creditBalance[i] += (- creditUsage[i])

    output = ["{0};{1:02}".format(t_, r_) for t_, r_ in zip(timestamp, creditUsage)]
    return(output)

realBonusUDF = udf(creditUsage,ArrayType(StringType()))

a= df.withColumn('data', concat_ws(';', col('period'), col('activity'), col('credit'))) \
  .groupBy('userID').agg(collect_list('data').alias('data')) \
  .withColumn('data', realBonusUDF('data')) \
  .withColumn("data", explode("data")) \
  .withColumn("data", split("data", ";")) \
  .withColumn("timestamp", col('data')[0].cast("int")) \
  .withColumn("creditUsage", col('data')[1].cast("float")) \
  .drop('data')

Output:

+------+---------+-----------+
|userID|timestamp|creditUsage|
+------+---------+-----------+
|   123|        1|        0.0|
|   123|        2|        0.0|
|   123|        3|        3.0|
|   123|        4|        0.0|
|   123|        5|        1.0|
|   123|        6|        1.0|
|   123|        7|        1.0|
|   123|        8|        0.0|
|   123|        9|        0.0|
|   123|       10|        2.0|
+------+---------+-----------+