1
votes

I have a pyspark dataframe which has three columns id, seq, seq_checker. I need to order by id and check for 4 consecutive 1's in seq_checker column.

I tried using window functions. I'm unable to change all values in a window based on a condition.

new_window = Window.partitionBy().orderBy("id").rangeBetween(0, 3)
output = df.withColumn('check_sequence',F.when(F.min(df['seq_checker']).over(new_window) == 1, True))

original pyspark df:

+---+---+-----------+--------------+
| Id|seq|seq_checker|check_sequence|
+---+---+-----------+--------------+
|  1|  1|          1|         false|
|  2|  2|          1|         false|
|  3|  3|          1|         false|
|  4|  4|          1|         false|
|  5| 10|          0|         false|
|  6| 14|          1|         false|
|  7| 13|          1|         false|
|  8| 18|          0|         false|
|  9| 23|          0|         false|
| 10|  5|          0|         false|
| 11| 56|          0|         false|
| 12| 66|          0|         false|
| 13| 34|          1|         false|
| 14| 35|          1|         false|
| 15| 36|          1|         false|
| 16| 37|          1|         false|
| 17| 39|          0|         false|
| 18| 54|          0|         false|
| 19| 68|          0|         false|
| 20| 90|          0|         false|
+---+---+-----------+--------------+

Required output:

+---+---+-----------+--------------+
| Id|seq|seq_checker|check_sequence|
+---+---+-----------+--------------+
|  1|  1|          1|          true|
|  2|  2|          1|          true|
|  3|  3|          1|          true|
|  4|  4|          1|          true|
|  5| 10|          0|         false|
|  6| 14|          1|         false|
|  7| 13|          1|         false|
|  8| 18|          0|         false|
|  9| 23|          0|         false|
| 10|  5|          0|         false|
| 11| 56|          0|         false|
| 12| 66|          0|         false|
| 13| 34|          1|          true|
| 14| 35|          1|          true|
| 15| 36|          1|          true|
| 16| 37|          1|          true|
| 17| 39|          0|         false|
| 18| 54|          0|         false|
| 19| 68|          0|         false|
| 20| 90|          0|         false|
+---+---+-----------+--------------+

Based on the above code, my output is:

+---+---+-----------+--------------+
| Id|seq|seq_checker|check_sequence|
+---+---+-----------+--------------+
|  1|  1|          1|          true|
|  2|  2|          1|          null|
|  3|  3|          1|          null|
|  4|  4|          1|          null|
|  5| 10|          0|          null|
|  6| 14|          1|          null|
|  7| 13|          1|          null|
|  8| 18|          0|          null|
|  9| 23|          0|          null|
| 10|  5|          0|          null|
| 11| 56|          0|          null|
| 12| 66|          0|          null|
| 13| 34|          1|          true|
| 14| 35|          1|          null|
| 15| 36|          1|          null|
| 16| 37|          1|          null|
| 17| 39|          0|          null|
| 18| 54|          0|          null|
| 19| 68|          0|          null|
| 20| 90|          0|          null|
+---+---+-----------+--------------+

Edit: 1. If we have more than 4 consecutive rows having 1's we need to change check_sequence flag for all the rows to True.

  1. My actual problem is to check for sequences of length greater than 4 in the 'seq' column. I was able to create seq_checker column using lead and lag functions.
2
How do you treat 5 consectutive 1's? - cronoik
Need 5 consecutive true's. - Kas1

2 Answers

0
votes

Initially define a window with just an id ordering. Then use a difference in row numbers approach (with different ordering) to group consecutive 1's (also groups consecutive same values) with the same group number. Once the grouping is done, just check to see if the max and min of the group is 1 and there are atleast 4 1's in the group, to get the desired boolean output.

from pyspark.sql.functions import row_number,count,when,min,max
w1 = Window.orderBy(df.id)
w2 = Window.orderBy(df.seq_checker,df.id)
groups = df.withColumn('grp',row_number().over(w1)-row_number().over(w2))
w3 = Window.partitionBy(groups.grp)
output = groups.withColumn('check_seq',(max(groups.seq_checker).over(w3)==1) & (min(groups.seq_checker).over(w3)==1) & (count(groups.id).over(w3) >= 4)
output.show()
0
votes

The rangeBetween gives you the access to rows which are relative from the current row. You defined a window for 0,3 which gives you access to the current row and the three following rows, but this will only set the correct value for the first 1 of 4 consectutive rows of 1's. The second element of 4 consectutive rows of 1's needs acess to the previous row and following two rows (-1,2). The third element of 4 consectutive rows of 1's needs acess to the two previous rows and following two rows (-2,1). Finally the fourth element of 4 consectutive rows of 1's needs acess to the three previous rows(-3,0).

import pyspark.sql.functions as F
from pyspark.sql import Window

l = [
(  1,  1,          1),
(  2,  2,          1),
(  3,  3,          1),
(  4,  4,          1),
(  5, 10,          0),
(  6, 14,          1),
(  7, 13,          1),
(  8, 18,          0),
(  9, 23,          0),
( 10,  5,          0),
( 11, 56,          0),
( 12, 66,          0),
( 13, 34,          1),
( 14, 35,          1),
( 15, 36,          1),
( 16, 37,          1),
( 17, 39,          0),
( 18, 54,          0),
( 19, 68,          0),
( 20, 90,          0)
]

columns = ['Id','seq','seq_checker']

df=spark.createDataFrame(l, columns)

w1 = Window.partitionBy().orderBy("id").rangeBetween(0, 3)
w2 = Window.partitionBy().orderBy("id").rangeBetween(-1, 2)
w3 = Window.partitionBy().orderBy("id").rangeBetween(-2, 1)
w4 = Window.partitionBy().orderBy("id").rangeBetween(-3, 0)

output = df.withColumn('check_sequence',F.when(
                                            (F.min(df['seq_checker']).over(w1) == 1) |
                                            (F.min(df['seq_checker']).over(w2) == 1) |
                                            (F.min(df['seq_checker']).over(w3) == 1) |
                                            (F.min(df['seq_checker']).over(w4) == 1) 
                        , True).otherwise(False))
output.show()

Output:

+---+---+-----------+--------------+ 
| Id|seq|seq_checker|check_sequence| 
+---+---+-----------+--------------+ 
|  1|  1|          1|          true| 
|  2|  2|          1|          true| 
|  3|  3|          1|          true| 
|  4|  4|          1|          true|          
|  5| 10|          0|          null| 
|  6| 14|          1|          null| 
|  7| 13|          1|          null| 
|  8| 18|          0|          null| 
|  9| 23|          0|          null| 
| 10|  5|          0|          null| 
| 11| 56|          0|          null| 
| 12| 66|          0|          null| 
| 13| 34|          1|          true| 
| 14| 35|          1|          true| 
| 15| 36|          1|          true| 
| 16| 37|          1|          true| 
| 17| 39|          0|          null| 
| 18| 54|          0|          null| 
| 19| 68|          0|          null| 
| 20| 90|          0|          null| 
+---+---+-----------+--------------+