1
votes

I have following case scenario

enter image description here

There are 2 Virtual machines which are sending streams to Kafka which are being received by CEP engine where warnings are generated when particular conditions are satisfied on the individual Stream.

Currently, CEP is checking for same conditions on both streams( when heart rate > 65 and respiration rate > 68) for both patients and raising alarms in Parallel as shown below

 // detecting pattern
        Pattern<joinEvent, ? > pattern = Pattern.<joinEvent>begin("start")
                .subtype(joinEvent.class).where(new FilterFunction<joinEvent>() {
                    @Override
                    public boolean filter(joinEvent joinEvent) throws Exception {
                        return joinEvent.getHeartRate() > 65 ;
                    }
                })
                .subtype(joinEvent.class)
                .where(new FilterFunction<joinEvent>() {
                    @Override
                    public boolean filter(joinEvent joinEvent) throws Exception {
                        return joinEvent.getRespirationRate() > 68;
                    }
                }).within(Time.milliseconds(100));

But I want to use different conditions for both Streams. For example, I would like to raise alarm if

For patient A : if heart rate > 65 and Respiration Rate > 68
For patient B : if heart rate > 75 and Respiration Rate > 78

How do I achieve this ? do I need to create multiple stream environments or multiple patterns in the same environment.

1
hey, I would like to know if you find a solution of your question?Leyla Lee
Yes , different patients write to different topics and flink has many workers working in parallel , each listening to a topic and performing cepAmarjit Dhillon
Thank you for your reply, I thought different patients wrote to same source/DataStream, and you wanted to apply different CEP pattern according to the different event/Patient T.TLeyla Lee

1 Answers

2
votes

For your requirements, you can create 2 different patterns to have clear separation if you want.

If you want to perform this with the same pattern then it would be possible as well. To do this, read all your kafka topics in one kafka source:

    FlinkKafkaConsumer010<JoinEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topic1", "topic2"),
        new StringSerializerToEvent(),
        props);

Here I am assuming that the structure of your event from both the topics are the same and you have the patient name as well as part of the event which is trasnmitted.

Once you did that, it becomes easy as you just need to create a pattern with "Or", something like the following:

    Pattern.<JoinEvent>begin("first")
        .where(new SimpleCondition<JoinEvent>() {

          @Override
          public boolean filter(JoinEvent event) throws Exception {
            return event.getPatientName().equals("A") && event.getHeartRate() > 65 && joinEvent.getRespirationRate() > 68;
          }
        })
        .or(new SimpleCondition<JoinEvent>() {

          @Override
          public boolean filter(JoinEvent event) throws Exception {
            return event.getPatientName().equals("B") && event.getHeartRate() > 75 && joinEvent.getRespirationRate() > 78;
          }
        });

This would produce a match whenever your condition matches. Although, I am not really sure what ".within(Time.milliseconds(100))" is achieving in your example.