0
votes

I have flink job that aggregates data using keyed tumbling windows with event time and watermark.

My question is does flink holds a state of his already closed windows? Otherwise, I have no other explanation why an event that belongs to a window that never opened before will open a window and not dropped it immediately.

given that our windows are for 1 hour and forBoundedOutOfOrderness is 10 minutes

Lets have an example :

event1 = ("2022-01-01T08:25:00Z") => window fired

event2 = ("2022-01-01T09:25:00Z") => window created but not fired as expected

event3 = ("2022-01-01T05:25:00Z") => will be aggregate with event 4 instead of dropped (why?)

event4 = ("2022-01-01T05:40:00Z") => will be aggregate with event 3 instead of dropped (why?)

    val stream = env
      .fromSource(
        kafkaSource,
        WatermarkStrategy
          .forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
          .withIdleness(Duration.ofSeconds(idleness))
          .withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
            override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
                : Long = {
              logger.info(
                LogMessage(
                  element._3.orgId,
                  s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
                  element._3.flowId
                )
              )
              element._3.updateTime.asEpoch
            }
          }),
        s"Source - $kConsumeTopic"
      )

    stream
      .keyBy(element => (element._2.orgId -> element._2.procUid))                                                                     
      .window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
      .reduce(new ReduceFunc)                                                                                         
      .name("Aggregated EnrichedProcess")
      .sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
      .name(s"Sink -> $kProduceTopic")

edited: The way I'm testing this out is Integration Tests with docker compose. I'm generating an event to Kafka => consumed by Flink job & sink to Kafka => checking the content of kafka.

When I put Sleep of 30 sec between sending the event3 and event4 are dropped. This is the behaviour I was expecting.

    val producer = new Producer(producerTopic)

    val consumer = new Consumer(consumerTopic, groupId)
    producer.send(event1)
    producer.send(event2)
    Thread.sleep(30000)
    producer.send(event3)
    Thread.sleep(30000)
    producer.send(event4)

    val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()

But even more curious now is why when I put Sleep of 10 sec instead of 30, I recieve only the first situation (The watermark was supposed to be updated already(defualt of periodic watermark generator is 200ms)

1

1 Answers

0
votes

While you would expect

event3 = ("2022-01-01T05:25:00Z")

to be late, it will only truly be late if a large enough watermark has managed to arrive first. With the forBoundedOutOfOrderness strategy there's no guarantee of that -- this is a periodic watermark generator that produces watermarks every 200 msec. So it could be that a watermark based on the timestamp of event2 is produced between event3 and event4.

That's one possible explanation; there may be others depending on the exact circumstances.

Another very relevant factor is that if the parallelism is > 1, then there are have multiple, independent instances of the watermark strategy each doing their own thing based on the events they process.

Operators with multiple input channels, such as the keyed window, will combine these watermarks by taking the minimum of the incoming watermarks as their own watermark.

(The answer to the original question -- does Flink retain the state for windows that have already been closed -- is no. Once the allowed lateness (if any) has expired, the state for an event time window is purged.)