0
votes

For the first time to question. I have a question about Apache Flink. In order to measure the latency performance for each parallel number of Apache Flink, we want to total the time difference between when a window is created and when that window is emitted for each window.

Latency is calculated using system.currenttimemillis () and public long start_time defined in the constructor of Window using Watermark Trigger, but the value is quite large for the window width. As a cause of this weirdness, I thought that Flink was reusing the Window. What is actually going on?

Also, even if the start_time of the window is substituted on the trigger side, it has not been changed. What should this do?

Also, if you have a better way to measure the survival time of Window, I would be grateful if you could tell me. (If the Flink is reusing the window, the start_time of the discarded window will be used again.)

private static class MyTrigger extends Trigger<Tuple2<Integer,Integer>,MyTimeWindow>{
File ratency_file;
MyTrigger(){
  super();
  try{
    ratency_file = new File(ratency_filePath);
    FileWriter filewriter = new FileWriter(ratency_file,false);
    filewriter.close();
  }catch(IOException e){
    System.out.println("IOException");
  }
}
@Override
public TriggerResult onElement(Tuple2<Integer,Integer> element, long timestamp, MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException {
  if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
      int index = (int)(System.currentTimeMillis()-window.start_time);
      FileWriter filewriter = new FileWriter(ratency_file,true);
      filewriter.write(Integer.toString(index)+"\n");
      filewriter.close();
            return TriggerResult.FIRE_AND_PURGE;
      } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
      }
}
@Override
public TriggerResult onEventTime(long time, MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException {
  TriggerResult res = time == window.maxTimestamp() ? TriggerResult.FIRE_AND_PUREGE : TriggerResult.CONTINUE;
  if( res == TriggerResult.FIRE_AND_PURGE ){
    int index = (int)(System.currentTimeMillis()-window.start_time);
    FileWriter filewriter = new FileWriter(ratency_file,true);
    filewriter.write(Integer.toString(index)+"\n");
    filewriter.close();
  }
  return res;
}
@Override
public void clear(MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException{
  ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public TriggerResult onProcessingTime(long timestamp, MyTimeWindow window , Trigger.TriggerContext ctx){
  TriggerResult res = TriggerResult.CONTINUE;
  return res;
}

}

1
You appear to have a typo on this line: TriggerResult res = time == window.maxTimestamp() ? TriggerResult.FIRE_AND_PUREGE : TriggerResult.CONTINUE;David Anderson

1 Answers

0
votes

You are using event time windows, and by computing System.currentTimeMillis()-window.start_time when the window is ready to close you are mixing together many different sources of delay.

An event time window is triggered when a Watermark arrives that passes the end of the window. Assuming a BoundedOutOfOrderness watermark generator, this can't happen until an event arrives with a timestamp greater than the end time of the window plus the delay you have configured to accommodate out-of-order events. Furthermore, the watermark won't be created immediately upon the arrival of such an event, but rather when the autowatermarkinterval expires.

Putting this all together, the latency you are measuring includes many things that may seem rather large:

  • the time elapsed between when events are timestamped and when they are ingested by Flink (because window.start_time is based on the timestamps in the events, and you are comparing this to System.currentTimeMillis())
  • the delay imposed by watermarking (to accommodate out-of-order events)
  • delays due to the auto watermarking interval (default: 200 msec)
  • delays from network buffering (default: 100 msec)
  • the window duration