I am try to count data in stream with different window size(the size of window is in steam data), so I use custom WindowAssigner and AggregateFunction, but state is huge (window range from one hour to 30 day)
In my mind aggregate state is only store intermediate result
Is there something wrong?
public class ElementProcessingTime extends WindowAssigner<Element, TimeWindow> {
@Override public Collection<TimeWindow> assignWindows(Element element, long timestamp, WindowAssignerContext context) {
long slide = Time.seconds(10).toMilliseconds();
long size = element.getTime() * 60 * 1000;
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, 0, slide);
for (long start = lastStart; start > timestamp - size; start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
@Override public Trigger<FactorCalDetail, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ElementTimeTrigger.create();
}
@Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override public boolean isEventTime() {
return false;
}
}
public class CountAggregate implements AggregateFunction<FactorCalDetail, AggregateResult, AggregateResult> {
@Override public AggregateResult createAccumulator() {
AggregateResult result = new AggregateResult();
result.setResult(0.0);
return result;
}
@Override public AggregateResult add(FactorCalDetail value, AggregateResult accumulator) {
accumulator.setKey(value.getGroupKey());
accumulator.addResult();
accumulator.setTimeSpan(value.getTimeSpan());
return accumulator;
}
@Override public AggregateResult getResult(AggregateResult accumulator) {
return accumulator;
}
@Override public AggregateResult merge(AggregateResult a, AggregateResult b) {
if (a.getKey().equals(b.getKey())) {
a.setResult(a.getResult() + b.getResult());
}
return a;
}
}
env.addSource(source)
.keyBy(Element::getKey)
.window(new ElementProcessingTime())
.aggregate(new CountAggregate())
.addSink(new RedisCustomizeSink(redisProperties));