Flink source functions introduce Watermarks and these are flown down to downstream operators, based on which different time based things can execute. For operators consuming multiple streams, the minimum on incoming watermarks is considered as operator's watermark at the moment.
keyBy
splits the source stream into multiple logical streams which are then passed down to downstream operators(Eg. Process function).
Eg.
driversStream.keyBy(driver => driver.Id).process(new ProcessDriversFunction)
class ProcessDriversFunction extends ..... {
override def processElement(record Driver, ctx Context, out Collector) {
// Register an event timer to fire after 5 seconds
ctx.timerService().registerEventTimeTimer(record.timestamp+5)
}
}
Let's say there are 4 subtasks of Process function(eg. P1, P2, P3, P4
) and there are 100 key groups(let's say, KG1, KG2....KG100
) with each subtask processing 25 key groups ie. P1 processing KG1 to KG25
, P2 processing KG26 to KG50
and so on.
If there are no elements in DriverStream from 5pm, none of P1,P2,P3,P4
gets a watermark later than 5pm and hence no timers will fire after 5pm.
Let's say, at 5pm, driver stream starts getting records such that all of them map to 1 key group ie. KG1
and hence being processed by 1 subtask of Process function(ie. P1
).
- Does the event time progress on P2, P3, P4 as they are not receiving any records.
- On P1, timers registered via elements of
KG1
will be fired. But do the timers registered forKG2 to KG25
on P1 gets fired as they are not receiving any elements in P1.