If your stream is in-order by time (it only matters that the stream is sorted for each individual device), then you could easily transform the stream to make this analysis easier. A RichFlatMapFunction
like this will transform the sequence of ON OFF events into a sequence of state CHANGE events:
static class DetectChanges extends RichFlatMapFunction<String, String> {
private transient ValueState<String> previousState;
@Override
public void open(Configuration parameters) throws Exception {
previousState = getRuntimeContext().getState(new ValueStateDescriptor<>("previousState", String.class));
}
@Override
public void flatMap(String onOrOff, Collector<String> out) throws Exception {
if (previousState.value() != onOrOff) {
out.collect("CHANGE");
previousState.update(onOrOff);
}
}
}
Now the problem has been reduced to determining if the stream has some number of CHANGE events during an interval of time. This could easily be done with sliding windows, or you could use CEP if you like.
You could also do this entirely with CEP. Conceptually you might approach this as follows:
- define an individual Pattern that matches ON+ OFF+
- then define a Pattern group that matches that ON/OFF pattern whenever it occurs n times within some time interval