0
votes

I want to use Flink for a remote patient monitoring case scenario which includes various sensors like gyroscope, accelerometer, ECG Stream, HR rate stream, RR rate etc. So in this case scenario it's not possible that we should have the same data type or input rate etc, but still I want to detect arrhythmia or other medical condition which involves doing CEP on these multiple sensors

What I know is that ,If I want to perform some complex event processing on these sensors, then I have 2 options that needs to be done before the CEP

  1. Join diff streams
  2. merge diff streams

Earlier I was performing a join based upon timestamps of sensors, but it does not result in joining all the events as diff streams can have diff rates and different timestamps in microseconds, so it will be a rare case such that the timestamps are exactly equal.

So I would like to go with option # 2 i.e performing a merge before doing CEP. for doing this, I have found on Flink documentation, that I can merge the two streams but they should have same data type, I tried to do the same but I am unsuccessful as I got following error

  Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<org.carleton.cep.monitoring.latest.Events.RRIntervalStreamEvent> and GenericType<org.carleton.cep.monitoring.latest.Events.qrsIntervalStreamEvent>
    at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:217)

Now let's see how I tried to perform a merge. So basically I had two stream classes, their attributes are as follows

RRIntervalStreamEvent Stream

public Integer Sensor_id;
public Long time;
public Integer RRInterval;

qrsIntervalStreamEvent Stream

public Integer Sensor_id;
public Long time;
public Integer qrsInterval;

Both of these streams have generators classes which also sends the events at in same data types at the specified rate.Below is code by which I tried to merge them.

// getting qrs interval stream
   DataStream<qrsIntervalStreamEvent> qrs_stream_raw = envrionment.
                    addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");


// getting RR interval stream
 DataStream<RRIntervalStreamEvent> rr_stream_raw = envrionment.
                         addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");


//merging both streams
 DataStream<Tuple3<Integer,Long,Integer>> mergedStream;
            mergedStream = rr_stream_raw.union(new DataStream[]{qrs_stream_raw});

I have to use new DataStream[] as just using qrs_stream_raw was resulting in error as shown below.

error snapshot

Can someone please give me an idea about

  1. how should I merge these two streams?
  2. how should I merge more than two streams?
  3. is there some engine which can merge more than two streams having different structures, if yes which engine should I use
1

1 Answers

3
votes

As pointed out by Alex, we can use the same data type of both the streams and can join them in Flink, another option is to use Siddhi or Flink-Siddhi extension. But I want to do everything in Flink only

So here are couple of changes I made in my program to make it work

Step # 1: made both of my generator classes to return common type

public class RR_interval_Gen extends RichParallelSourceFunction<Tuple3<Integer,Long, Integer>>

step# 2: made both of stream generators to have Tuple types and then merged 2 streams.

 // getting qrs interval stream
    DataStream<Tuple3<Integer,Long,Integer>> qrs_stream_raw = envrionment.
            addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");


    // getting RR interval stream
         DataStream<Tuple3<Integer,Long,Integer>> rr_stream_raw = envrionment.
                 addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");


         //merging both streams
    DataStream<Tuple3<Integer,Long,Integer>> mergedStream = rr_stream_raw.union(qrs_stream_raw);