5
votes

I have a scenario where I have to change the state if a second event did not follow first event within x seconds. For e.g. user did not logout in 100 mins, consider him to be in invalid state. How can this be designed using the current pattern operations?

2

2 Answers

2
votes

At the moment this is not possible to do. The solution would be to have a timeout handler which is triggered whenever a event sequence is discarded because it falls out of the defined time window. There is already a JIRA issue which tracks the timeout handler implementation.

5
votes

As this has been implemented already, I thought of answering this question for those who are coming here looking for answers.

As of Flink 1.0.0, this can be done with handling the Timedout Pattern, for example, if your CEP pattern is something like this :

Example partially from from Flink Website (There are some major changes between 1.2 and 1.3 please adjust your code accordingly, this answer focuses on 1.3)

Pattern description: - Get first event of type "error", followed by a second event event of type "critical" within 10 seconds

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("error");
    }
}).followedBy("end").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("critical");
    }
}).within(Time.seconds(10));

PatternStream<BAMEvent> patternStream = CEP.pattern(inputStream, pattern)

DataStream<Either<String, String>> result = patternStream.select(new PatternTimeoutFunction<Event, String>() {
  @Override
  public String timeout(Map<String, List<Event>> map, long l) throws Exception {
    return map.toString() +" @ "+ l;
  }
}, new PatternSelectFunction<Event, String>() {

  @Override
  public String select(Map<String, List<Event>> map) throws Exception {
    return map.toString();
  }
});

For this case, if the user doesn't logout even after 100 mins, then as the corresponding event wouldn't arrive, it would result in the pattern being timedout and the partial event(the initiating event) would be captured in the PatternTimeoutFunction.