0
votes

This first question;

I want to learn time behaviour of window. Let's assume I'll process data every 2 seconds with the Processing time, and the current time is 10:26:25.169. At this time, I deployed job.

In this case, Will each time window be rounded to 0, 2, 4 and so on seconds? Like below;

  • 10:26:24.000 - 10:26:26.000
  • 10:26:26.000 - 10:26:28.000

As you can see, i've deploy job at 10:26:25.169, but flink did round window by 2 seconds. Is that right?

If not, Does windows works like below?;

  • 10:26:25.169 - 10:26:27.169
  • 10:26:27.169 - 10:26:29.169

Which one is true? Is this behaviour can change when I use event time instead of processing time?

The second question;

I want to keep state for each key. For that i can use richFlatMap function or keyedProcessFunction. But i wonder that can I manage state using above functions after applied window? For example;

// in this case i can manage state by key
ds.keyBy(_.field).process(new MyStateFunction)

// in this case, can i manage state after window for the same key? 
ds.keyBy(keyExtractorWithTime)
  .window(new MyWindowFunction)
  .reduce(new myRedisFunction)
  .process(new MyStateFunction)
2
This, and other quirks of the window API, is covered in a tutorial in the docs: ci.apache.org/projects/flink/flink-docs-stable/learn-flink/…David Anderson

2 Answers

1
votes

As for the first question, it will always be full 2 second interval rounded, so basically as You've described:

10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000

There is an offset argument that allows You to control that behaviour to some extent. But basically while the Flink actually creates the window when the data arrives, the startTime and endTime do not depend on when the data arrives, so the data is fit into the window not the other way around.

More info can be found here

As, for the second question the ProcessWindowFunction is keyed function and thus You will be able to use keyed state inside the function just as You would be able to do it in standard ProcessFunction.

0
votes

Question 1: If offset parameter not be assigned, flink will use integer multiples of window size as the startTime by default (endTime = startTime + windowSize). So the blow you asked is right.

10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000

In flink, the startTime will be calculated by this way:

/**
  * Method to get the window start for a timestamp.
  *
  * @param timestamp epoch millisecond to get the window start.
  * @param offset The offset which window start would be shifted by.
  * @param windowSize The size of the generated windows. windowSize.
  * @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    return timestamp - (timestamp - offset + windowSize) % windowSize;
}

Question 2: If you want manage state after Keyed Window, the below way maybe effective. This way allow you manage the state and reduce function results for each window.

DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}

For more detail here.