1
votes

I'm trying to bucket the objects in the incoming stream on 2 criteria.

  1. If the total count of objects is N, then bucket it and send downstream.
  2. If the time since last N object is >= timeout, bucket it and send downstream.

Both of these functionalities are available in Flink separately as CountTrigger and ProcessingTimeSessionWindows.

I'm trying to combine the functionality of the two to create a custom trigger and extend ProcessingTimeSessionWindows to use that trigger. It triggers for the second condition but not the first. As the stream is not a keyed stream, I can't use ValueState to store the count so I was wondering what alternatives I have for this.

The code is below:

public class ProcessingTimeCountSessionWindow extends ProcessingTimeSessionWindows {
    private static final long serialVersionUID = 786L;

    private final int count;

   private ProcessingTimeCountSessionWindow(int count, long timeout) {
       super(timeout);
       this.count = count;
   }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeCountTrigger.create(count);
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param count Max count of elements in session i.e. the upper bound on count gap between sessions
     * @param size The session timeout, i.e. the time gap between sessions
     * @return The policy.
     */
    public static ProcessingTimeCountSessionWindow withCountAndGap(int count, Time size) {
        return new ProcessingTimeCountSessionWindow(count, size.toMilliseconds());
    }

}

The custom trigger is below:

The Count Trigger uses a ReducingState but my stream isn't keyed so that doesn't work.

public class ProcessingTimeCountTrigger extends Trigger<Object, TimeWindow> {

    private static final long serialVersionUID = 786L;

    private final int maxCount;

    private final ReducingStateDescriptor<Integer> countStateDesc =
            new ReducingStateDescriptor<>("window-count", new ReduceFunctions.IntSum(), IntSerializer.INSTANCE);

    private ProcessingTimeCountTrigger(int maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
        count.add(1);
        if (count.get() >= maxCount) {
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(countStateDesc).clear();
    }

    public static ProcessingTimeCountTrigger create(int maxCount) {
        return new ProcessingTimeCountTrigger(maxCount);
    }
    @Override
    public String toString() {
        return "ProcessingTimeCountTrigger(" + maxCount + ")";
    }

}
2

2 Answers

2
votes

I was able to solve it by exactly copy-pasting the CountTrigger and overriding the following:

@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
    return TriggerResult.FIRE_AND_PURGE;
}

I also did not need to extend ProcessingTimeSessionWindow since I can just use the custom trigger created. Unfortunately we can't extend the CountTrigger due to it's private constructor, otherwise that would have been the best solution.

So the final code looks something like this:

consoleInput.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
            .trigger(ProcessingTimeCountTrigger.of(10L))
            .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
        @Override
        public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            List<String> alphaList = new ArrayList<>();
            elements.forEach(alphaList::add);
            out.collect("Time is " + new Date().toString());
            out.collect("Total " + alphaList.size() + " elements in window");
        }
    })

This sends the bucketed data downstream if we have 10 elements OR it has been 10 seconds since we last saw an element.

The custom trigger code is below:

public class ProcessingTimeCountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    private ProcessingTimeCountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
    }

    @Override
    public String toString() {
        return "ProcessingTimeCountTrigger(" +  maxCount + ")";
    }

    /**
     * Creates a trigger that fires once the number of elements in a pane reaches the given count.
     *
     * @param maxCount The count of elements at which to fire.
     * @param <W> The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> ProcessingTimeCountTrigger<W> of(long maxCount) {
        return new ProcessingTimeCountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }
}
0
votes

does the AsadSMalik's code really work? the default ProcessTimeTrigger also regist timer to trigger the window when max timestamp arrived. source code as bellow

ctx.registerProcessingTimeTimer(window.maxTimestamp());