0
votes

Flink source functions introduce Watermarks and these are flown down to downstream operators, based on which different time based things can execute. For operators consuming multiple streams, the minimum on incoming watermarks is considered as operator's watermark at the moment.

keyBy splits the source stream into multiple logical streams which are then passed down to downstream operators(Eg. Process function).

Eg.

driversStream.keyBy(driver => driver.Id).process(new ProcessDriversFunction)

class ProcessDriversFunction extends ..... {
   override def processElement(record Driver, ctx Context, out Collector) {
      // Register an event timer to fire after 5 seconds
      ctx.timerService().registerEventTimeTimer(record.timestamp+5)
   }
}

Let's say there are 4 subtasks of Process function(eg. P1, P2, P3, P4) and there are 100 key groups(let's say, KG1, KG2....KG100) with each subtask processing 25 key groups ie. P1 processing KG1 to KG25, P2 processing KG26 to KG50 and so on.

If there are no elements in DriverStream from 5pm, none of P1,P2,P3,P4 gets a watermark later than 5pm and hence no timers will fire after 5pm.

Let's say, at 5pm, driver stream starts getting records such that all of them map to 1 key group ie. KG1 and hence being processed by 1 subtask of Process function(ie. P1).

  • Does the event time progress on P2, P3, P4 as they are not receiving any records.
  • On P1, timers registered via elements of KG1 will be fired. But do the timers registered for KG2 to KG25 on P1 gets fired as they are not receiving any elements in P1.
1

1 Answers

0
votes

Does the event time progress on P2, P3, P4 as they are not receiving any records.

That depends on whether there are idle instances of the source function, and whether or not withIdleness() is used with the watermark strategy. If all instances of the source function are handling records from KG1, or if withIdleness() is used, then the watermark will advance on P2, P3, and P4.

... do the timers registered for KG2 to KG25 on P1 get fired ... ?

Yes they do. The currentWatermark is applied throughout a given subtask, regardless of keys or key groups.