I am trying calculate the activity value that is not originated from extra creditation.
Input:
+------+--------+------+
|period|activity|credit|
+------+--------+------+
| 1| 5| 0|
| 2| 0| 3|
| 3| 4| 0|
| 4| 0| 3|
| 5| 1| 0|
| 6| 1| 0|
| 7| 5| 0|
| 8| 0| 1|
| 9| 0| 1|
| 10| 5| 0|
+------+--------+------+
Output:
rdd = sc.parallelize([(5,0,5),(0,3,0),(4,0,1),(0,3,0),(1,0,0),(1,0,0),(5,0,4),(0,1,0),(0,1,0),(5,0,3)])
df = rdd.toDF(["activity","credit","realActivity"])
+------+--------+------+------------+
|period|activity|credit|realActivity|
+------+--------+------+------------+
| 1| 5| 0| 5|
| 2| 0| 3| 0|
| 3| 4| 0| 1|
| 4| 0| 3| 0|
| 5| 1| 0| 0|
| 6| 1| 0| 0|
| 7| 5| 0| 4|
| 8| 0| 1| 0|
| 9| 0| 1| 0|
| 10| 5| 0| 3|
+------+--------+------+------------+
I tried to create a credit balance column that adds and deducts based on the row type, but I could not restart it conditionally (every time it goes below zero) based on itself. It looks like a recursive problem that i am not sure how to transform into pyspark friendly. Obviously, I can't do the following, self referencing the previous value..
w = Window.orderBy("period")
df = df.withColumn("realActivity", lag("realActivity",1,0).over(w) - lag("credit", 1, 0).over(w) - lag("activity",1,0).over(w) )
Update: As it was pointed out, it is not possible with window calculation. Therefore I would like to do something like the snippet below to calculate creditBalance that would let me calculate the realActivity.
df['creditBalance']=0
for i in range(1, len(df)):
if (df.loc[i-1, 'creditBalance']) > 0:
df.loc[i, 'creditBalance'] = df.loc[i-1, 'creditBalance'] + df.loc[i, 'credit'] - df.loc[i, 'activity']
elif df.loc[i, 'creditamount'] > 0:
df.loc[i, 'creditBalance'] = df.loc[i, 'credit'] - df.loc[i, 'activity']
Now, my only question is: how can I apply this "local" function to each group in a spark dataframe?
- write dataframe to files by group and process locally?
- custom map and collect the rows for the local execution?
- collapse rows to a single row by group and process that ?
- anything else?
realActivityis calculated? For example, how are the values for period 1 and 2 computed? - pansenlagand cummulative sum. If your data is not too large, you could give it a shot in pandas. - pansen