1
votes

I am newbie to Apache Flink i am trying dynamic evaluation of patterns in a stream using flink CEP. I am trying find the users who performed the following action login,addtocart and logout and its able to detect the pattern but if i am defining multiple patterns like login,logout its not able to detect the pattern

Below is my code

Action class

public class Action {

    public int userID;
    public String action;

    public Action() {
    }

    public Action(int userID, String action) {
        this.userID = userID;
        this.action = action;
    }

    public int getUserID() {
        return userID;
    }

    public void setUserID(int userID) {
        this.userID = userID;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    @Override
    public String toString() {
        return "Action [userID=" + userID + ", action=" + action + "]";
    }

}

Pattern Class

public class Pattern {

    public String firstAction;
    public String secondAction;
    public String thirdAction;

    public Pattern() {

    }

    public Pattern(String firstAction, String secondAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
    }

    public Pattern(String firstAction, String secondAction, String thirdAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
        this.thirdAction = thirdAction;
    }

    public String getFirstAction() {
        return firstAction;
    }

    public void setFirstAction(String firstAction) {
        this.firstAction = firstAction;
    }

    public String getSecondAction() {
        return secondAction;
    }

    public void setSecondAction(String secondAction) {
        this.secondAction = secondAction;
    }

    public String getThirdAction() {
        return thirdAction;
    }

    public void setThirdAction(String thirdAction) {
        this.thirdAction = thirdAction;
    }

    @Override
    public String toString() {
        return "Pattern [firstAction=" + firstAction + ", secondAction=" + secondAction + ", thirdAction=" + thirdAction
                + "]";
    }



}

Main Class

public class CEPBroadcast {

    public static class PatternEvaluator
            extends KeyedBroadcastProcessFunction<Integer, Action, Pattern, Tuple2<Integer, Pattern>> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        ValueState<String> prevActionState;

        MapStateDescriptor<Void, Pattern> patternDesc;

        @Override
        public void open(Configuration conf) throws IOException {
            prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
            patternDesc = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
        }

        @Override
        public void processBroadcastElement(Pattern pattern, Context ctx, Collector<Tuple2<Integer, Pattern>> out)
                throws Exception {

            BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
            bcState.put(null, pattern);
            ;

        }

        @Override
        public void processElement(Action action, ReadOnlyContext ctx, Collector<Tuple2<Integer, Pattern>> out)
                throws Exception {
            Pattern pattern = ctx.getBroadcastState(this.patternDesc).get(null);
            String prevAction = prevActionState.value();

            if (pattern != null && prevAction != null) {

                if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(prevAction)
                        && pattern.thirdAction.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                } else if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                }
            }

            prevActionState.update(action.action);

        }

    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Action> actions = env.fromElements(new Action(1001, "login"), new Action(1002, "login"),
                new Action(1003, "login"), new Action(1003, "addtocart"), new Action(1001, "logout"),
                new Action(1003, "logout"));

        DataStream<Pattern> pattern = env.fromElements(new Pattern("login", "logout"));

        KeyedStream<Action, Integer> actionByUser = actions
                .keyBy((KeySelector<Action, Integer>) action -> action.userID);

        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID,
                Types.POJO(Pattern.class));

        BroadcastStream<Pattern> bcedPattern = pattern.broadcast(bcStateDescriptor);

        DataStream<Tuple2<Integer, Pattern>> matches = actionByUser.connect(bcedPattern)
                .process(new PatternEvaluator());

        matches.flatMap(new FlatMapFunction<Tuple2<Integer, Pattern>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void flatMap(Tuple2<Integer, Pattern> value, Collector<String> out) throws Exception {

                if (value.f1.thirdAction != null) {
                    out.collect("User ID: " + value.f0 + ",Pattern matched:" + value.f1.firstAction + ","
                            + value.f1.secondAction + "," + value.f1.thirdAction);
                } else {

                    out.collect("User ID: " + value.f0 + ",Pattern matched:" + value.f1.firstAction + ","
                            + value.f1.secondAction);

                }

            }

        }).print();

        env.execute("CEPBroadcast");

    }

}

If i am giving one Pattern to evaluate its giving me output like below

DataStream<Action> actions = env.fromElements(new Action(1001, "login"), new Action(1002, "login"),
                new Action(1003, "login"), new Action(1003, "addtocart"), new Action(1001, "logout"),
                new Action(1003, "logout"));

DataStream<Pattern> pattern = env.fromElements(new Pattern("login", "logout"));

Output: User ID: 1001,Pattern matched:login,logout

if i am trying to give multiple patterns to evaluate like below, Its not evaluating the second pattern suggest how can i evaluate multiple patterns, thanks in advance.

DataStream<Pattern> pattern = env.fromElements(new Pattern ("login","addtocart","logout"),
                new Pattern("login", "logout"));

Output:  User ID: 1003,Pattern matched:login,addtocart,logout
1

1 Answers

1
votes

There are a couple of reasons why this isn't working:

(1) Whenever you have a Flink operator with multiple input streams, like the PatternEvaluator in your application, you have no control over how that operator will read from its inputs. In your case it might fully consume the events from the Action stream before reading the Patterns, or vice versa, or it might interleave the two streams. In a sense, you are lucky it matched anything at all.

Solving this problem won't be easy. If you know all the patterns at compile time (in other words, if they aren't actually dynamic), then you could use Flink CEP, or MATCH_RECOGNIZE from Flink SQL.

If you truly need dynamic patterns, then you'll have to find a way to block the action stream until the patterns have been read. This topic ("side inputs") has been covered before in other questions here on SO. For example, see How to unit test BroadcastProcessFunction in flink when processElement depends on broadcasted data. (Or you could adjust your expectations, and be satisfied that only actions processed after a pattern has been stored can be matched against that pattern.)

(2) By using null as the key when storing the patterns via

bcState.put(null, pattern);

you are overwriting the first pattern with the second one when it arrives. There is never a time when both patterns are available for matching.

To match the input against two different patterns you'll need to modify your PatternEvaluator to handle simultaneous matching for both patterns. This will entail storing both patterns in broadcast state, considering both of them in processElement, and having instances of prevActionState for both patterns. You might want to give the patterns IDs, use those IDs as keys in the broadcast state, and use MapState for the prevActionState, once again keyed by the pattern ID.

Update:

Keep in mind that when you use the DataStream API to write an streaming job, you are not defining the sequence of execution as you would in a typical procedural application. You are instead describing the topology of a dataflow graph, and the behavior of the operators embedded in that graph that will execute the job (which will be executed in parallel).