For the first time to question. I have a question about Apache Flink. In order to measure the latency performance for each parallel number of Apache Flink, we want to total the time difference between when a window is created and when that window is emitted for each window.
Latency is calculated using system.currenttimemillis ()
and public long start_time
defined in the constructor of Window using Watermark Trigger, but the value is quite large for the window width.
As a cause of this weirdness, I thought that Flink was reusing the Window. What is actually going on?
Also, even if the start_time of the window is substituted on the trigger side, it has not been changed. What should this do?
Also, if you have a better way to measure the survival time of Window, I would be grateful if you could tell me.
(If the Flink is reusing the window, the start_time
of the discarded window will be used again.)
private static class MyTrigger extends Trigger<Tuple2<Integer,Integer>,MyTimeWindow>{
File ratency_file;
MyTrigger(){
super();
try{
ratency_file = new File(ratency_filePath);
FileWriter filewriter = new FileWriter(ratency_file,false);
filewriter.close();
}catch(IOException e){
System.out.println("IOException");
}
}
@Override
public TriggerResult onElement(Tuple2<Integer,Integer> element, long timestamp, MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
int index = (int)(System.currentTimeMillis()-window.start_time);
FileWriter filewriter = new FileWriter(ratency_file,true);
filewriter.write(Integer.toString(index)+"\n");
filewriter.close();
return TriggerResult.FIRE_AND_PURGE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException {
TriggerResult res = time == window.maxTimestamp() ? TriggerResult.FIRE_AND_PUREGE : TriggerResult.CONTINUE;
if( res == TriggerResult.FIRE_AND_PURGE ){
int index = (int)(System.currentTimeMillis()-window.start_time);
FileWriter filewriter = new FileWriter(ratency_file,true);
filewriter.write(Integer.toString(index)+"\n");
filewriter.close();
}
return res;
}
@Override
public void clear(MyTimeWindow window, Trigger.TriggerContext ctx) throws IOException{
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public TriggerResult onProcessingTime(long timestamp, MyTimeWindow window , Trigger.TriggerContext ctx){
TriggerResult res = TriggerResult.CONTINUE;
return res;
}
}
TriggerResult res = time == window.maxTimestamp() ? TriggerResult.FIRE_AND_PUREGE : TriggerResult.CONTINUE;
– David Anderson