0
votes

I am trying to stitch few event rows in dataframe together based on time difference between them. I have created a new column in dataframe which represent time difference with the previous row using lag. The dataframe looks as follows:

sc=spark.sparkContext
df = spark.createDataFrame(
    sc.parallelize(
        [['x',1, "9999"], ['x',2, "120"], ['x',3, "102"], ['x',4, "3000"],['x',5, "299"],['x',6, "100"]]
    ), 
    ['id',"row_number", "time_diff"]
)

I want to stitch the rows if the time_diff with the previous event is less than 160. For this, I was planning to assign the new row numbers to all the events which are within 160 time of each other and then take groupby on new row number

For the above dataframe I wanted the output as:

   +------+----------+---------+--------------+
    |id.   |row_number|time_diff|new_row_number|
    +------+----------+---------+--------------+
    |     x|  1       |     9999|             1|
    |     x|  2       |      120|             1|
    |     x|  3       |      102|             1|
    |     x|  4       |     3000|             4|
    |     x|  5       |      299|             5|
    |     x|  6       |      100|             5|
    +------+----------+---------+--------------+

I wrote a program as follows:

from pyspark.sql.functions import when,col

window = Window.partitionBy('id').orderBy('row_number')

df2=df.withColumn('new_row_number', col('id'))
df3=df2.withColumn('new_row_number', when(col('time_diff')>=160, col('id'))\
                       .otherwise(f.lag(col('new_row_number')).over(window)))

but the output I got was as follows:

+------+----------+---------+--------------+
|id.   |row_number|time_diff|new_row_number|
+------+----------+---------+--------------+
|     x|  1       |     9999|             1|
|     x|  2       |      120|             1|
|     x|  3       |      102|             2|
|     x|  4       |     3000|             4|
|     x|  5       |      299|             5|
|     x|  6       |      100|             5|
+------+----------+---------+--------------+

Can someone help me out in resolving this? Thanks

1

1 Answers

1
votes

So you want the previous value of the column currently being populated which is not possible, so to achieve this we can do following:

window = Window.partitionBy('id').orderBy('row_number')
df3=df.withColumn('new_row_number', f.when(f.col('time_diff')>=160, f.col('row_number')))\
      .withColumn("new_row_number", f.last(f.col("new_row_number"), ignorenulls=True).over(window))

+---+----------+---------+--------------+
| id|row_number|time_diff|new_row_number|
+---+----------+---------+--------------+
|  x|         1|     9999|             1|
|  x|         2|      120|             1|
|  x|         3|      102|             1|
|  x|         4|     3000|             4|
|  x|         5|      299|             5|
|  x|         6|      100|             5|
+---+----------+---------+--------------+

To explain:

First we generate the row value for every row which is greater than 160 else null

df2=df.withColumn('new_row_number', f.when(f.col('time_diff')>=160, f.col('row_number')))
df2.show()

+---+----------+---------+--------------+
| id|row_number|time_diff|new_row_number|
+---+----------+---------+--------------+
|  x|         1|     9999|             1|
|  x|         2|      120|          null|
|  x|         3|      102|          null|
|  x|         4|     3000|             4|
|  x|         5|      299|             5|
|  x|         6|      100|          null|
+---+----------+---------+--------------+

Then we fill the dataframe with last value using this

df3=df2.withColumn("new_row_number", f.last(f.col("new_row_number"), ignorenulls=True).over(window))
df3.show()

+---+----------+---------+--------------+
| id|row_number|time_diff|new_row_number|
+---+----------+---------+--------------+
|  x|         1|     9999|             1|
|  x|         2|      120|             1|
|  x|         3|      102|             1|
|  x|         4|     3000|             4|
|  x|         5|      299|             5|
|  x|         6|      100|             5|
+---+----------+---------+--------------+

Hope it solves your question.