0
votes

I am try to count data in stream with different window size(the size of window is in steam data), so I use custom WindowAssigner and AggregateFunction, but state is huge (window range from one hour to 30 day)

In my mind aggregate state is only store intermediate result

Is there something wrong?

public class ElementProcessingTime extends WindowAssigner<Element, TimeWindow> {
    @Override public Collection<TimeWindow> assignWindows(Element element, long timestamp, WindowAssignerContext context) {
        long slide = Time.seconds(10).toMilliseconds();
        long size = element.getTime() * 60 * 1000;
        timestamp = context.getCurrentProcessingTime();

        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, 0, slide);
        for (long start = lastStart; start > timestamp - size; start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    }

    @Override public Trigger<FactorCalDetail, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ElementTimeTrigger.create();
    }

    @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override public boolean isEventTime() {
        return false;
    }
}

public class CountAggregate implements AggregateFunction<FactorCalDetail, AggregateResult, AggregateResult> {

    @Override public AggregateResult createAccumulator() {
        AggregateResult result = new AggregateResult();
        result.setResult(0.0);
        return result;
    }

    @Override public AggregateResult add(FactorCalDetail value, AggregateResult accumulator) {
        accumulator.setKey(value.getGroupKey());
        accumulator.addResult();
        accumulator.setTimeSpan(value.getTimeSpan());
        return accumulator;
    }

    @Override public AggregateResult getResult(AggregateResult accumulator) {
        return accumulator;
    }

    @Override public AggregateResult merge(AggregateResult a, AggregateResult b) {
        if (a.getKey().equals(b.getKey())) {
            a.setResult(a.getResult() + b.getResult());
        }
        return a;
    }
}

env.addSource(source)
    .keyBy(Element::getKey)
    .window(new ElementProcessingTime())
    .aggregate(new CountAggregate())
    .addSink(new RedisCustomizeSink(redisProperties));
2
If you're using keyed windows, you can use the RocksDB state backend to reduce the pressure on the heap ci.apache.org/projects/flink/flink-docs-stable/ops/state/…Yuval Itzchakov
I already use RocksDBStateBackend incrementalCheckpointing,but the state is too huge, 10,000 data is about 1g, it's intolerable (;′⌒`)jimmy
You're seeing the state in memory grow rapidly? How many nodes are you running, CPU / Memory on each?Yuval Itzchakov
test environment k8s pod 1 core 2gjimmy
Minimum Pause Between Checkpoints is 1m, the state size is grow rapidlyjimmy

2 Answers

1
votes

When you assign custom windows, the state size may quickly go out of hand. That's mainly because each window need to hold all records that fall within it until the window is aggregated and eventually evicted. In your code, it also seems like you create a huge amount of windows per record.

You didn't specify your use case, but I'm assuming that you actually want to calculate how many events stretch over a given point in time for each key with a 10 ms bin size. If so, then this is not directly a use case for windows.

What you want to do is:

  1. Split your event into smaller events.
  2. Group by key and bin.
  3. Count your bin.

Rough sketch in code:

input.flatMap(element -> {
        ...
        for (long start = lastStart; start > timestamp - size; start -= slide) {
            emit(new KeyTime(key, start));
        }
    })
    .keyBy(keyTime -> keyTime)
    .count()

You may apply windows after the keyBy to force certain output properties, such as wait for a few minutes and then output everything and ignore late events.

Note: KeyTime is a simple POJO holding the key and the bin time.

edit: after your comment, the solution is actually much simpler.

env.addSource(source)
    .keyBy(element -> new Tuple2<>(element.getKey(), element.getTime()))
    .count()
    .addSink(new RedisCustomizeSink(redisProperties));
0
votes

You don't say what source is and that will have its own state to persist. You also don't say how many unique keys there are. Even a small amount of state per key can grow huge as the number of unique keys increases. If the problem does end up being somewhere in the growth of the aggregator state, you might try splitting the windowing logic into a series of two windows, one to aggregate hourly and a second to aggregate the hourly rollups to your desired timeframe.