I'm trying to bucket the objects in the incoming stream on 2 criteria.
- If the total count of objects is N, then bucket it and send downstream.
- If the time since last N object is >= timeout, bucket it and send downstream.
Both of these functionalities are available in Flink separately as CountTrigger
and ProcessingTimeSessionWindows
.
I'm trying to combine the functionality of the two to create a custom trigger and extend ProcessingTimeSessionWindows
to use that trigger. It triggers for the second condition but not the first. As the stream is not a keyed stream, I can't use ValueState to store the count so I was wondering what alternatives I have for this.
The code is below:
public class ProcessingTimeCountSessionWindow extends ProcessingTimeSessionWindows {
private static final long serialVersionUID = 786L;
private final int count;
private ProcessingTimeCountSessionWindow(int count, long timeout) {
super(timeout);
this.count = count;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeCountTrigger.create(count);
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param count Max count of elements in session i.e. the upper bound on count gap between sessions
* @param size The session timeout, i.e. the time gap between sessions
* @return The policy.
*/
public static ProcessingTimeCountSessionWindow withCountAndGap(int count, Time size) {
return new ProcessingTimeCountSessionWindow(count, size.toMilliseconds());
}
}
The custom trigger is below:
The Count Trigger uses a ReducingState
but my stream isn't keyed so that doesn't work.
public class ProcessingTimeCountTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 786L;
private final int maxCount;
private final ReducingStateDescriptor<Integer> countStateDesc =
new ReducingStateDescriptor<>("window-count", new ReduceFunctions.IntSum(), IntSerializer.INSTANCE);
private ProcessingTimeCountTrigger(int maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
count.add(1);
if (count.get() >= maxCount) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countStateDesc).clear();
}
public static ProcessingTimeCountTrigger create(int maxCount) {
return new ProcessingTimeCountTrigger(maxCount);
}
@Override
public String toString() {
return "ProcessingTimeCountTrigger(" + maxCount + ")";
}
}