0
votes

I have a Spark dataframe (Pyspark 2.2.0) that contains events, each has a timestamp. There is an additional column that contains series of tags (A,B,C or Null). I would like to calculate for each row - by group of events, ordered by timestamp - a count of the current longest stretch of changes of non Null tags (Null should reset this count to 0). Example of df with my ideal calculated column called stretch:

event timestamp   tag    stretch
G1    09:59:00    Null   0
G1    10:00:00    A      1  ---> first non Null tag starts the count
G1    10:01:00    A      1  ---> no change of tag
G1    10:02:00    B      2  ---> change of tag (A to B)
G1    10:03:00    A      3  ---> change of tag (B to A)
G1    10:04:00    Null   0  ---> Null resets the count
G1    10:05:00    A      1  ---> first non Null tag restarts the count

G2    10:00:00    B      1  ---> first non Null tag starts the count
G2    10:01:00    C      2  ---> change of tag (B to C)

In Pyspark I can define a window like this:

window = Window.partitionBy("event").orderBy(col("timestamp").asc())

and calculate for example the change of tag:

df = df.withColumn("change_of_tag",col("tag")!=lag("tag",1).over(window))

but I cannot find how to calculate a cumulative sum of these changes that would reset each time a Null tag is encountered. I suspect that I should define a new window partitioned by event and type of tag (Null or not Null) but I don't know how to partition by event, order by timestamp and after that, group by type of tag.

2
Did you try using .groupBy and sum based on aggs? - pvy4917
can you please add the expected result? - Ali Yesilli
@AliYesilli Expected result is already there, it's the stretch column, sorry if it was not clear. - Patrick
@Prazy Problem is I don't know how to group after an orderby (I have to group by event, order by timestamp and then after, group by non null/null tag). - Patrick

2 Answers

1
votes

I think it is very tricky case. Especially 'no change of tag' situation is difficult to handle in one process. So you can find my solution below. I have to create some new calculated columns to obtain the result

>>> import pyspark.sql.functions as F
>>> from pyspark.sql.window import Window
>>> 
>>> df.show()
+-----+---------+----+
|event|timestamp| tag|
+-----+---------+----+
|   G1| 09:59:00|null|
|   G1| 10:00:00|   A|
|   G1| 10:01:00|   A|
|   G1| 10:02:00|   B|
|   G1| 10:03:00|   A|
|   G1| 10:04:00|null|
|   G1| 10:05:00|   A|
|   G2| 10:00:00|   B|
|   G2| 10:01:00|   C|
+-----+---------+----+

>>> df = df.withColumn('new_col1', F.when(F.isnull('tag'),1).otherwise(0))
>>> 
>>> window1 = Window.partitionBy('event').orderBy('timestamp')
>>> 
>>> df = df.withColumn('new_col2', F.row_number().over(window1))
>>> df = df.withColumn('new_col3', F.lag('tag').over(window1))
>>> df = df.withColumn('new_col4', F.lag('new_col2').over(window1))
>>> df = df.withColumn('new_col4', F.when(df['new_col3']==df['tag'],df['new_col4']).otherwise(df['new_col2']))
>>> df = df.withColumn('new_col5', F.sum('new_col1').over(window1))
>>> df = df.withColumn('new_col5', F.when(F.isnull('tag'),None).otherwise(df['new_col5']))
>>> 
>>> window2 = Window.partitionBy('event','new_col5').orderBy('new_col4')
>>> 
>>> df = df.withColumn('new_col6', F.when(F.isnull('tag'),0).otherwise(F.dense_rank().over(window2)))
>>> df = df.select('event','timestamp','tag', df['new_col6'].alias('stretch'))
>>> 
>>> df.sort(["event", "timestamp"], ascending=[1, 1]).show()
+-----+---------+----+-------+                                                  
|event|timestamp| tag|stretch|
+-----+---------+----+-------+
|   G1| 09:59:00|null|      0|
|   G1| 10:00:00|   A|      1|
|   G1| 10:01:00|   A|      1|
|   G1| 10:02:00|   B|      2|
|   G1| 10:03:00|   A|      3|
|   G1| 10:04:00|null|      0|
|   G1| 10:05:00|   A|      1|
|   G2| 10:00:00|   B|      1|
|   G2| 10:01:00|   C|      2|
+-----+---------+----+-------+
0
votes

Code revised and fixed:

df = spark.createDataFrame([\
        ("G1", 113, "-1"),("G1", 114, "A"),("G1", 115, "A"),("G1", 116, "A"),\ 
        ("G1", 117, "B"),("G1", 118, "A"),("G1", 119, "-1"),\
        ("G1", 120, "A"),("G2", 121, "B"),("G2", 122, "C")],["event","timestamp","tag"])

df = df.withColumn("tag",when(col("tag")=="-1",lit(None)).otherwise(col("tag")))

window_trip = Window.partitionBy('event').orderBy('timestamp')

df = df.withColumn('in_out', when(\
        (row_number().over(window_trip)>1) & 
(  ( (col('tag').isNull()) &     (lag('tag').over(window_trip).isNotNull())) \
        | ( (col('tag').isNotNull()) &  (lag('tag').over(window_trip).isNull()) \
        ) \
    ),1) \
    .otherwise(0))
df = df.withColumn('group', sum('in_out').over(window_trip))
df = df.withColumn('tag_change', ((( (col('tag')!=lag('tag').over(window_trip)) ) | (row_number().over(window_trip)==1))).cast("int")  )
df = df.withColumn('tag_rank', sum('tag_change').over(window_trip) )
window2 = Window.partitionBy('event','group').orderBy('tag_rank')
df = df.withColumn('stretch', when(col('tag').isNull(),0).otherwise(dense_rank().over(window2)))
df.sort(["event", "timestamp"], ascending=[1, 1]).show()

Thanks again to @AliYesilli, you gave me the hints and the dense_rank fct!