I have flink job that aggregates data using keyed tumbling windows with event time and watermark.
My question is does flink holds a state of his already closed windows? Otherwise, I have no other explanation why an event that belongs to a window that never opened before will open a window and not dropped it immediately.
given that our windows are for 1 hour and forBoundedOutOfOrderness is 10 minutes
Lets have an example :
event1 = ("2022-01-01T08:25:00Z") => window fired
event2 = ("2022-01-01T09:25:00Z") => window created but not fired as expected
event3 = ("2022-01-01T05:25:00Z") => will be aggregate with event 4 instead of dropped (why?)
event4 = ("2022-01-01T05:40:00Z") => will be aggregate with event 3 instead of dropped (why?)
val stream = env
.fromSource(
kafkaSource,
WatermarkStrategy
.forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
.withIdleness(Duration.ofSeconds(idleness))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
: Long = {
logger.info(
LogMessage(
element._3.orgId,
s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
element._3.flowId
)
)
element._3.updateTime.asEpoch
}
}),
s"Source - $kConsumeTopic"
)
stream
.keyBy(element => (element._2.orgId -> element._2.procUid))
.window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
.reduce(new ReduceFunc)
.name("Aggregated EnrichedProcess")
.sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
.name(s"Sink -> $kProduceTopic")
edited: The way I'm testing this out is Integration Tests with docker compose. I'm generating an event to Kafka => consumed by Flink job & sink to Kafka => checking the content of kafka.
When I put Sleep of 30 sec between sending the event3 and event4 are dropped. This is the behaviour I was expecting.
val producer = new Producer(producerTopic)
val consumer = new Consumer(consumerTopic, groupId)
producer.send(event1)
producer.send(event2)
Thread.sleep(30000)
producer.send(event3)
Thread.sleep(30000)
producer.send(event4)
val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()
But even more curious now is why when I put Sleep of 10 sec instead of 30, I recieve only the first situation (The watermark was supposed to be updated already(defualt of periodic watermark generator is 200ms)