0
votes

I have a stream of records which are 'keyed by' two fields and then assigned a session window with a gap of 30 seconds. I use the 'time stamp' that is attached to the records as the event time. I am using 'assignAscendingTimestamps' watermark.

Consider the below records for example. The stream is keyed by (user,place).

Record1 : user1, place1, timestamp t1

Record2 : user2, place1, timestamp 30 seconds after t1

Record3 : user1, place1, timestamp within 30 seconds of t1

Record4 : user1, place1, timestamp 30 seconds after t1

Record2 belongs to user2 and hence it belongs to a different bucket as the stream is keyed. Hence I was expecting Record1, Record3 and Record4 to belong to one bucket and Record2 to belong to another bucket.

Bucket1

Record1 : user1, place1, timestamp t1

Record3 : user1, place1, timestamp within 30 seconds of t1

Record4 - user1, place1, timestamp 30 seconds after t1

Bucket2

Record2 : user2, place1, timestamp 30 seconds after t1

As per my understanding, a session window containing Record1 and Record3 will get triggered only when Record4 arrives. But when I run the code, a session that contains only Record1 gets triggered when Record2 arrives since Record2's timestamp is after the time gap (30 seconds) of Record1's timestamp, although Record2's key is different. I browsed through Flink's documentation and several examples I could find online for Session Windows. Yet I am not able to solve this issue. Is there something that I am missing here? Could this be because of the ascending timestamps watermark?

1

1 Answers

4
votes

The problem is that assignAscendingTimestamps requires your timestamps to be monotonically increasing across all keys. The reason for this is that Flink cannot generate per key watermarks.

Update

Since Flink cannot generate per key watermarks one has to generate the watermarks such that they are valid for all elements. If the timestamps are monotonic per key but not across all keys, then you have to define the maximum out of orderness (difference in timestamp) between two keys. By subtracting this out of orderness from the timestamp of an element you will obtain a valid watermark. See also BoundedOutOfOrdernessTimestampExtractor. However, be aware that if elements arrive with a larger out of orderness, then this will also break.