0
votes

I have a stream of Booking elements of the following form:

Booking(id=B1, driverId=D1, time=t1, location=l1)
Booking(id=B2, driverId=D2, time=t2, location=l2)

I need to find, per location, count of bookings made in last 15mins. But the window should be evaluated for any new booking coming for a location.

Roughly like:

Assuming `time` field is set as timestamp of record
bookingStream.keyBy(b=>b.location).window(Any window of 15mins).trigger(triggerFunction)

Except that the trigger function should not be evaluated at the end of 15mins but instead whenever any booking arrives at a location, and emit the count of booking in last 15min from timestamp of newly arrived booking.

Approach:

Use RichMap function, maintain a priority queue of location bookings as a managed state(ValueState) with timestamp as priority of bookings. For each element that arrives, first add it to state and remove elements earlier than 15mins from currently arrived elements. Emit the count of remaining elements in priority queue to collector.

Is this the right way or it could be achieved by using some other flink construct in a better way.

1

1 Answers

1
votes

If you are running on the heap-based state backend, what you propose should behave reasonably well. But with RocksDB you will have to go through serialization/deserialization of the priority queue for every access, which may be rather painful.

An approach that might perform better on RocksDB would be to keep the current count along with the earliest timestamp in ValueState, and the set of bookings in ListState. The RocksDB state backend can append to ListState without going through ser/de, so you would only have to deserialize and reserialize the whole list when the earliest element is too old.