3
votes

It would be great to be able to match events based on their field values going beyond the current capability of creating patterns out of events matching separate criteria. For example as explained at https://flink.apache.org/news/2016/04/06/cep-monitoring.html we can do:

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .within(Time.seconds(10));

However it would be great to create Pattern out of a capability like: .where(second_evt->evt.getTemperature() == first_evt->evt.getTemperature()

1
Consider writing to [email protected] as this is rather a feature request. - Dawid Wysakowicz
Thanks Dawid.. e-mail didnt work! This is the error I got --------------> Hi. This is the qmail-send program at apache.org. I'm afraid I wasn't able to deliver your message to the following addresses. This is a permanent error; I've given up. Sorry it didn't work out. <[email protected]>: Must be sent from an @apache.org address or a subscriber address or an address in LDAP. - Systems User
In order to be able to send messages to the mailing list, you must subscribe by sending an email to this address: [email protected] - Alex Chermenin

1 Answers

0
votes

If you want to compare values from fields in different events you can do it in flatSelect method and just using a very simple pattern without any where expressions:

  1. Create a pattern:

    Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
        .subtype(TemperatureEvent.class)
        .next("Second Event")
        .subtype(TemperatureEvent.class)
        .within(Time.seconds(10));
    
  2. Apply the pattern to data stream:

    PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
        inputEventStream.keyBy("rackID"),
        warningPattern);
    
  3. Check the values and generate a new complex event via flatSelect method:

    DataStream<TemperatureWarning> warnings = tempPatternStream.flatSelect(
        (Map<String, MonitoringEvent> pattern, Collector<TemperatureAlert> out) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("First Event");
            TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event");
    
            if (first.getTemperature() <= second.getTemperature()) {
                out.collect(new TemperatureWarning(
                    first.getRackID(), 
                    (first.getTemperature() + second.getTemperature()) / 2));
            }
        });