3
votes

I am trying flink with some network monitor work. My goal is to count distinct dst_ip per src_ip.

My following codes work, but the performance is really bad. It seems each sliding window recalculate all the event, but this should not be necessary.

For example, we have events on time second 1 - 600. Flink can get accumulator of each second, so we have 600 accumulators for each second. When the first sliding window expire, flink just merge accumulators of 1-300, and destroy accumulator of second 1. This window could also pre-merge 1-299 before last second. When the second sliding window expire, flink just merge accumulator of 2-301, and destroy accumulator of second 2. And so on.....

This way will be much more efficient than assign event to multiple windows, and calculate aggregation per window.

Does flink support this? And can I get similar function by myself with flink?

Thanks a lot!

public static class AverageAccumulator2 {
    String key;
    Set<String> target;
    AverageAccumulator2() {
        target = new HashSet<>();
    }
}

public static class Average2 implements AggregateFunction<ObjectNode, AverageAccumulator2, Tuple3<String, Long, Set<String>>> {
    @Override
    public AverageAccumulator2 createAccumulator() {
        return new AverageAccumulator2();
    }

    @Override
    public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
        accumulator.key = value.get("value").get("src_ip").asText();
        accumulator.target.add(value.get("value").get("dst_ip").asText());
        return accumulator;
    }
    @Override
    public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
        return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
    }

    @Override
    public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
        a.target.addAll(b.target);
        return a;
    }
}

final SingleOutputStreamOperator<Tuple3<String, Long, Set<String>> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())
                    .timeWindow(Time.seconds(300),Time.seconds(1))
                    .aggregate(new Average2());
1

1 Answers

0
votes

As you have observed, Flink does not attempt to optimize sliding windows. This does become very expensive with fine-grained sliding.

What you can do is implement your own logic for handling state and timers using a ProcessFunction -- you can implement this just as you have outlined. You will have a processElement method that for each incoming record, updates the data structures you'll use for accumulating results, and an onTimer method that fires once a second, merges the partial results together, and sends the results downstream.