I have a stream of records which are 'keyed by' two fields and then assigned a session window with a gap of 30 seconds. I use the 'time stamp' that is attached to the records as the event time. I am using 'assignAscendingTimestamps' watermark.
Consider the below records for example. The stream is keyed by (user,place).
Record1 : user1, place1, timestamp t1
Record2 : user2, place1, timestamp 30 seconds after t1
Record3 : user1, place1, timestamp within 30 seconds of t1
Record4 : user1, place1, timestamp 30 seconds after t1
Record2 belongs to user2 and hence it belongs to a different bucket as the stream is keyed. Hence I was expecting Record1, Record3 and Record4 to belong to one bucket and Record2 to belong to another bucket.
Bucket1
Record1 : user1, place1, timestamp t1
Record3 : user1, place1, timestamp within 30 seconds of t1
Record4 - user1, place1, timestamp 30 seconds after t1
Bucket2
Record2 : user2, place1, timestamp 30 seconds after t1
As per my understanding, a session window containing Record1 and Record3 will get triggered only when Record4 arrives. But when I run the code, a session that contains only Record1 gets triggered when Record2 arrives since Record2's timestamp is after the time gap (30 seconds) of Record1's timestamp, although Record2's key is different. I browsed through Flink's documentation and several examples I could find online for Session Windows. Yet I am not able to solve this issue. Is there something that I am missing here? Could this be because of the ascending timestamps watermark?