I'm having a bit of a battle with the Flink CEP greedy operator.
Given the below java code:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> strings = Arrays.asList("1,3,5,5,5,5,6,".split(","));
DataStream<String> input = env.fromCollection(strings);
Pattern<String, ?> pattern = Pattern.<String>
begin("start").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("5");
}
}).oneOrMore().greedy()
.followedBy("end").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("6");
}
});
PatternStream<String> patternStream = CEP.pattern(input, pattern);
DataStream<String> result = patternStream.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> pattern) throws Exception {
System.err.println("=======");
pattern.values().forEach(match -> match.forEach(event -> System.err.println(event)));
System.err.println("=======");
return "-";
}
});
result.print();
env.execute("Flink Streaming Java API Skeleton");
I would like to see: only "5 5 5 5 6" emitted
However, it matches "5 5 5 5 6", "5 5 5 6", "5 5 6", "5 6"
If I do:
begin("start").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("3");
}
}).followedBy("middle").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("5");
}
}).oneOrMore().greedy()
.followedBy("end").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("6");
}
});
However, (thus providing a different starting match) the Greedy operator works as expected by emitting "3 5 5 5 5 6".
Is it possible to have a greedy matcher grab all matches without having a different starting pattern?
Or am I missing something?
Stephan
.until
to replace.followedBy
after.oneOrMore().greedy()
? – Leyla Lee