So I have a dataframe and I want to calculation some quantity let's say in daily basis..let's say we have 10 columns col1,col2,col3,col4... coln which each columns are dependent on value col1
, col2, col3 , col4.. and so on and the date resets based on the id
..
+--------+----+---- +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 4| M1 | 10|
2020-08-03| 3| M1 | . . . 9 |
2020-08-04| 2| M1 | . . . 8 |
2020-08-05| 1| M1 | . . . 7 |
2020-08-06| 0| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 0 |
2020-08-02| 0| M2 | . . . . 1 |
2020-08-03| 0| M2 | . . . . 2 |
+---------+----+----+-----------------+
Let's say we execute this dataframe, there could be alot more columns in this df...
So to make this clear, let's say today's date is 2020-08-01. and we do some calculation and we got some output at coln let's say coln =3
at 2020-08-01, and I want to coln == col1 at 2020-08-02 which is col1 ==3 and carry on the calculation at 2020-08-02 and so on... so example of df looks like this below
+--------+----+---- +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 3| M1 | 10|
2020-08-03|10| M1 | . . . 9 |
2020-08-04| 9| M1 | . . . 8 |
2020-08-05| 8| M1 | . . . 7 |
2020-08-06| 7| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 1 |
2020-08-02| 1| M2 | . . . . 2 |
2020-08-03| 2| M2 | . . . . 0 |
+---------+----+----+-----------------+
It would be great if you guys can give me an example how this can be done in pyspark..
example: let's say col3 = col1+ col2
and initally, let's say col1 is all 0.
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("id", StringType(), True),\
StructField("col2", IntegerType(), True),\
StructField("col3", IntegerType(), True),\
StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2),('2020-08-02',0,'M1',2,3,1),\
('2020-08-03',0,'M1',3,3,3),('2020-08-04',0,'M1',3,3,1),\
('2020-08-01',0,'M2',1,3,1),('2020-08-02',0,'M2',-1,3,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 0| M1| 2| 3| 1|
|2020-08-03| 0| M1| 3| 3| 3|
|2020-08-04| 0| M1| 3| 3| 1|
|2020-08-01| 0| M2| 1| 3| 1|
|2020-08-02| 0| M2| -1| 3| 2|
+----------+----+---+----+----+----+
So Let's focus on 2020-08-01
which is the beginning, and what we want is col1+col2 which is 3 = col3. and after nth calculation that is dependent on col3.. col4... col5.. let's say we got to some number coln= 3. after that calculation is done, we want at 2020-08-02
, that coln=3 should be at col1
so it's a dynamically changing after 2020-08-01 calculation is complete
so my desired df would look like this
+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 2| M1| 2| 5| 1|
|2020-08-03| 1| M1| 3| 4| 3|
|2020-08-04| 3| M1| 3| 6| 1|
|2020-08-01| 1| M2| 1| 4| 1|
|2020-08-02| 1| M2| -1| 0| 2|
+----------+----+---+----+----+----+
EDIT 2:
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("id", StringType(), True),\
StructField("col2", IntegerType(), True),\
StructField("col3", IntegerType(), True),\
StructField("col4", IntegerType(), True),\
StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\
('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\
('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| 2|
|2020-08-02| 0| M1| 2| 3| 0| 1|
|2020-08-03| 0| M1| 3| 3| 2| 3|
|2020-08-04| 0| M1| 3| 3| 2| 1|
|2020-08-01| 0| M2| 1| 3| 3| 1|
|2020-08-02| 0| M2| -1| 3| 1| 2|
+----------+----+---+----+----+----+----+
so let's say coln = col4 - col2 then
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| -1|
|2020-08-02| -1| M1| 2| 1| 0| -2|
|2020-08-03| -2| M1| 3| 1| 2| -1|
|2020-08-04| -1| M1| 3| 2| 2| -1|
|2020-08-01| 0| M2| 1| 1| 3| 2|
|2020-08-02| 2| M2| -1| 1| 1| 2|
+----------+----+---+----+----+----+----+
w1 = Window.partitionBy('id').orderBy('Date')
, then dodf1.withColumn('col1', F.coalesce(F.lag('coln').over(w1),F.col('col1'))).withColumn('col3', F.col('col1') + F.col('col2')).show()
. but this does not match your desired df, which one is correct? – jxc