0
votes

I am using Flink's TimeWindow functionality to perform some computations. I am creating a 5 minute Window. However I want to create a one hour Window for only the first time. The next Windows I need are of 5 minutes.

Such that for the first hour, data is collected and my operation is performed on it. Once this is done, every five minutes the same operation is performed.

I figure out this can be implemented with a trigger but I am not sure which trigger I should use and how.

UPDATE: I don't think even triggers are helpful, from what I can get, they just define the time/count triggering per window, not when the first window is to be triggered.

1
I doubt that this is possible with the current DataStream API. If you define a window, the definition is "the same" for all instances that are build during runtime... The only way would be to define a custom operator and add it to your program via .transform(...) -- but this seems quite cumbersome to get done right. - Matthias J. Sax

1 Answers

3
votes

This is not trivial to implement.

Given a KeyedStream you have to use a GlobalWindow and a custom stateful Trigger which "remembers" whether is has fired for the first time or not.

val stream: DataStream[(String, Int)] = ???
val result = stream
  .keyBy(0)
  .window(GlobalWindows.create())
  .trigger(new YourTrigger())
  .apply(new YourWindowFunction())

Details about GlobalWindow and Trigger are in the Flink Window documentation.