I have a stream of Booking elements of the following form:
Booking(id=B1, driverId=D1, time=t1, location=l1)
Booking(id=B2, driverId=D2, time=t2, location=l2)
I need to find, per location, count of bookings made in last 15mins. But the window should be evaluated for any new booking coming for a location.
Roughly like:
Assuming `time` field is set as timestamp of record
bookingStream.keyBy(b=>b.location).window(Any window of 15mins).trigger(triggerFunction)
Except that the trigger function should not be evaluated
at the end of 15mins but instead whenever any booking arrives at a location
, and emit the count of booking in last 15min from timestamp of newly arrived booking
.
Approach:
Use RichMap function, maintain a priority queue of location bookings as a managed state(ValueState) with timestamp as priority of bookings. For each element that arrives, first add it to state and remove elements earlier than 15mins from currently arrived elements. Emit the count of remaining elements in priority queue to collector.
Is this the right way or it could be achieved by using some other flink construct in a better way.