0
votes

I am newbie to Flink i am trying a POC in which if no event is received in x amount of time greater than time specified in within time period in CEP

public class MyCEPApplication {


    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "consumer_group");

        FlinkKafkaConsumer<String> inputSignal = new FlinkKafkaConsumer<>("consumer_topic",
                new SimpleStringSchema(),
                properties);

        DataStream<String> inputSignalKafka = streamExecutionEnvironment.addSource(inputSignal);


        DataStream<MyEvent> eventDataStream = inputSignalKafka.map(new MapFunction<String, MyEvent>() {
            @Override
            public MyEvent map(String value) throws Exception {
                ItmAtomicEvent MyEvent = new ItmAtomicEvent();
                MyEvent.setJsonObject(new JSONObject(value));
                MyEvent.setAttribute("time",System.currentTimeMillis());
                return MyEvent;
            }
        }).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<MyEvent>() {
            @Override
            public long extractTimestamp(MyEvent event, long currentTimestamp) {
                System.out.println("TIMESTAMP: " +(long)event.getAttribute("time").get());
                System.out.println("Time Difference: " +((long)(event.getAttribute("time").get()) - timeDifference));
                timeDifference = (long)(event.getAttribute("time").get());
                return (long)event.getAttribute("time").get();
            }
            @Override
            public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
                return new Watermark(extractedTimestamp);
            }
        });


        eventDataStream.print("Source=======================================>");
        
        Pattern<MyEvent, ?> pattern =
                Pattern.<MyEvent>begin("start")
                    .where(new SimpleCondition<MyEvent>() {
                        @Override
                        public boolean filter(MyEvent event) {
                            return event.equals("Event1");
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<MyEvent>() {
                        @Override
                        public boolean filter(MyEvent event) {
                            return event.equals("Event2");
                        }
                    }).within(Time.seconds(10));


            PatternStream<MyEvent> patternStream = cepPatternMatching.compile(pattern, eventDataStream);

            OutputTag<MyEvent> timedout = new OutputTag<MyEvent>("timedout") {};
            


            SingleOutputStreamOperator<MyEvent> result = patternStream.flatSelect(
                    timedout,
                    new PatternFlatTimeoutFunction<MyEvent,MyEvent>() {
                        @Override
                        public void timeout(Map<String, List<MyEvent>> pattern, long timeoutTimestamp, Collector<MyEvent> out) throws Exception {
                            if(null != pattern.get("CustomerId")){
                                for (MyEvent timedOutEvent :pattern.get("CustomerId")){
                                    System.out.println("TimedOut Event : "+timedOutEvent.getField(0));
                                    out.collect(timedOutEvent);
                                }
                            }
                        }
                    },
                    new PatternFlatSelectFunction<MyEvent, MyEvent>() {
                        @Override
                        public void flatSelect(Map<String, List<MyEvent>> pattern,
                                               Collector<MyEvent> out) throws Exception {

                            out.collect(pattern.get("CustomerId").get(0));
                        }
                    }
            );


            result.print("=============CEP Pattern Detected==================");


            DataStream<MyEvent> timedOut = result.getSideOutput(timedout);
           
            timedOut.print("=============CEP TimedOut Pattern Detected==================");


        streamExecutionEnvironment.execute("CEPtest");

    }
}

Even if no event received after 10 seconds its printing timedout event i tried even commenting out code PatternFlatSelectFunction method, is there a way or work around to make timeout of pattern if no event received in a given x seconds. Someone asked same question i referred below solution but nothing worked for me, please help me in solving the issue

1)Apache Flink CEP how to detect if event did not occur within x seconds?

2)https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/cep.html#handling-timed-out-partial-patterns

3)https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/solutions/datastream_java/cep/LongRidesCEPSolution.java

1

1 Answers

1
votes

Your application is using event time, so you will need to arrange for a sufficiently large Watermark to be generated despite the lack of incoming events. You could use this example if you want to artificially advance the current watermark when the source is idle.

Given that your events don't have event-time timestamps, why don't you simply use processing time instead, and thereby avoid this problem? (Note, however, the limitation mentioned in https://stackoverflow.com/a/50357721/2000823).