A normal one-second-long processing time window will give you a window that contains all of the events that occur during one second, for any second in which there is at least one event. But this window will not be aligned to the first event; it will be aligned to the time-of-day clock. So if, for example, the first event in a window occurs half way through a given second, then that window will only include events for the 500 msec following the first one.
A ProcessingTimeTrigger
fires once at the end of the window. A CountinousProcessingTimeTrigger
fires repeatedly at some specified rate.
To get precisely the semantics you've described, you'll need a custom Trigger. You could do something similar to this OneSecondIntervalTrigger example, except that you'll want to switch from using event time to processing time, and only trigger once, rather than repeatedly.
That will leave you with something like this:
public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {
@Override
public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// firstSeen will be false if not set yet
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
// register initial timer only for first element
if (firstSeen.value() == null) {
// FIRE the window 1000 msec after the first event
long now = ctx.getCurrentProcessingTime();
ctx.registerProcessingTimeTimer(now + 1000);
fireSeen.update(true);
}
// Continue. Do not evaluate window now
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Continue. We don't use event time timers
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Evaluate the window now
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
// Clear trigger state
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
firstSeen.clear();
}
}