0
votes

I'm having a bit of a battle with the Flink CEP greedy operator.

Given the below java code:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    List<String> strings = Arrays.asList("1,3,5,5,5,5,6,".split(","));

    DataStream<String> input = env.fromCollection(strings);

    Pattern<String, ?> pattern = Pattern.<String>
    begin("start").where(new SimpleCondition<String>() {
        @Override
        public boolean filter(String value) throws Exception {
            return value.equals("5");
        }
    }).oneOrMore().greedy()
    .followedBy("end").where(new SimpleCondition<String>() {

        @Override
        public boolean filter(String value) throws Exception {
            return value.equals("6");
        }
    });

    PatternStream<String> patternStream = CEP.pattern(input, pattern);

    DataStream<String> result = patternStream.select(new PatternSelectFunction<String, String>() {
        @Override
        public String select(Map<String, List<String>> pattern) throws Exception {
            System.err.println("=======");
            pattern.values().forEach(match -> match.forEach(event -> System.err.println(event)));
            System.err.println("=======");
            return "-";
        }
    });

    result.print();
    env.execute("Flink Streaming Java API Skeleton");

I would like to see: only "5 5 5 5 6" emitted

However, it matches "5 5 5 5 6", "5 5 5 6", "5 5 6", "5 6"

If I do:

    begin("start").where(new SimpleCondition<String>() {
        @Override
        public boolean filter(String value) throws Exception {
            return value.equals("3");
        }
    }).followedBy("middle").where(new SimpleCondition<String>() {
        @Override
        public boolean filter(String value) throws Exception {
            return value.equals("5");
        }
    }).oneOrMore().greedy()
    .followedBy("end").where(new SimpleCondition<String>() {

        @Override
        public boolean filter(String value) throws Exception {
            return value.equals("6");
        }
    });

However, (thus providing a different starting match) the Greedy operator works as expected by emitting "3 5 5 5 5 6".

Is it possible to have a greedy matcher grab all matches without having a different starting pattern?

Or am I missing something?

Stephan

2
I've been playing with the org.apache.flink.cep.nfa.NFA class and its process(....) method. Looking at: discardComputationStatesAccordingToStrategy(computationStates, result, afterMatchSkipStrategy); The only result I'm interested in is result[0]. Playing with the code, it doesn't look like "afterMatchSkipStrategy" will allow me to get only the largest match.Stephan Kotze
have you been try use .until to replace .followedBy after .oneOrMore().greedy() ?Leyla Lee
Thanks Leyla. I've given that a go and tried a few more ways of using until as well, but I'm still getting the non greedy matches published. I feel I'm missing something in my understanding of the CEP engine implementation here, as this would appear to be a rather common use case, no? StephanStephan Kotze
Did you find a solution/understand how it works? I am facing a similar issue. If you use for example a GlobalWindow and a custom Trigger you could probably evict messages 5,5,5,6 Does this notion not apply in Flink-CEPuser390517
There's a know bug about greedy matching that may explain the behavior: issues.apache.org/jira/browse/FLINK-8914Chesnay Schepler

2 Answers

1
votes

Thanks to Chesnay Schepler's for his comment above:

There's a know bug about greedy matching that may explain the behavior: issues.apache.org/jira/browse/FLINK-8914

I'll note this as the answer for the time being.

0
votes

To control to how many matches an event will be assigned, you need to specify the skip strategy called AfterMatchSkipStrategy.

use Pattern.begin("start", AfterMatchSkipStrategy.skipPastLastEvent())

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List<String> strings = Arrays.asList("1,3,5,5,5,5,6,".split(","));

DataStream<String> input = env.fromCollection(strings);

Pattern<String, ?> pattern = Pattern.<String>
        begin("start", AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition<String>() {
  @Override
  public boolean filter(String value) throws Exception {
    return value.equals("5");
  }
}).oneOrMore().greedy()
        .followedBy("end").where(new SimpleCondition<String>() {
          @Override
          public boolean filter(String value) throws Exception {
            return value.equals("6");
          }
        });

PatternStream<String> patternStream = CEP.pattern(input, pattern);

DataStream<String> result = patternStream.select(new PatternSelectFunction<String, String>() {
  @Override
  public String select(Map<String, List<String>> pattern) throws Exception {
    System.err.println("=======");
    pattern.values().forEach(match -> match.forEach(event -> System.err.println(event)));
    System.err.println("=======");
    return "-";
  }
});

result.print();
env.execute("Flink Streaming Java API Skeleton");