I have a DataStream that consists of events with a property that represents a batch of produced elements. That property, let's call it 'batchNumber', is constant in every event I ingest from the same production batch. I receive multiple events per batch.
I want to analyze machine performance within a batch when the 'batchNumber' changes. My approach is to use a global stream and partition it using the 'batchNumber' as a key. I would expect that this partitions the global stream into windows in which there is every event with that 'batchNumber'. Then I define a trigger, which should fire when the 'batchNumber' changes. Then I can analyze the aggregated data in a ProcessWindowFunction.
My problems are:
- The trigger isn't always firing when the prodnr changes
- Even if it does fire, there is only one element being aggregated. I'm expecting close to 200.
This is the code I'm using.
public class batchnrTrigger extends Trigger<ImaginePaperData, GlobalWindow> {
private static final long serialVersionUID = 1L;
public batchnrTrigger() {}
private final ValueStateDescriptor<Integer> prevbatchnr = new ValueStateDescriptor<>("batchnr", Integer.class);
@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Integer> batchnrState = ctx.getPartitionedState(prevbatchnr);
if (batchnrState == null || batchnrState.value() == null || !(element.batchnr == batchnrState.value())) {
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should fire and process elements from window!");
batchnrState.update(element.batchnr);
return TriggerResult.FIRE;
}
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should not fire and continue ingesting elements!");
batchnrState.update(element.batchnr);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
}
}
This is how I call this trigger:
DataStream<String> imaginePaperDataStream = nifiStreamSource
.map(new ImaginePaperDataConverter())
.keyBy((ImaginePaperData event) -> event.lunum)
.window(GlobalWindows.create())
.trigger(new LunumTrigger())
.process(new ImaginePaperWindowReportFunction());
I'm aware that this question is similar to this question. But I am using ValueState and I don't think my firing condition is similar at all.
How can I get this working?