I am trying flink with some network monitor work. My goal is to count distinct dst_ip
per src_ip
.
My following codes work, but the performance is really bad. It seems each sliding window recalculate all the event, but this should not be necessary.
For example, we have events on time second 1 - 600. Flink can get accumulator of each second, so we have 600 accumulators for each second. When the first sliding window expire, flink just merge accumulators of 1-300, and destroy accumulator of second 1. This window could also pre-merge 1-299 before last second. When the second sliding window expire, flink just merge accumulator of 2-301, and destroy accumulator of second 2. And so on.....
This way will be much more efficient than assign event to multiple windows, and calculate aggregation per window.
Does flink support this? And can I get similar function by myself with flink?
Thanks a lot!
public static class AverageAccumulator2 {
String key;
Set<String> target;
AverageAccumulator2() {
target = new HashSet<>();
}
}
public static class Average2 implements AggregateFunction<ObjectNode, AverageAccumulator2, Tuple3<String, Long, Set<String>>> {
@Override
public AverageAccumulator2 createAccumulator() {
return new AverageAccumulator2();
}
@Override
public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
accumulator.key = value.get("value").get("src_ip").asText();
accumulator.target.add(value.get("value").get("dst_ip").asText());
return accumulator;
}
@Override
public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
}
@Override
public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
a.target.addAll(b.target);
return a;
}
}
final SingleOutputStreamOperator<Tuple3<String, Long, Set<String>> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())
.timeWindow(Time.seconds(300),Time.seconds(1))
.aggregate(new Average2());