I want to use Flink for a remote patient monitoring case scenario which includes various sensors like gyroscope, accelerometer, ECG Stream, HR rate stream, RR rate etc. So in this case scenario it's not possible that we should have the same data type or input rate etc, but still I want to detect arrhythmia or other medical condition which involves doing CEP on these multiple sensors
What I know is that ,If I want to perform some complex event processing on these sensors, then I have 2 options that needs to be done before the CEP
- Join diff streams
- merge diff streams
Earlier I was performing a join based upon timestamps of sensors, but it does not result in joining all the events as diff streams can have diff rates and different timestamps in microseconds, so it will be a rare case such that the timestamps are exactly equal.
So I would like to go with option # 2 i.e performing a merge before doing CEP. for doing this, I have found on Flink documentation, that I can merge the two streams but they should have same data type, I tried to do the same but I am unsuccessful as I got following error
Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<org.carleton.cep.monitoring.latest.Events.RRIntervalStreamEvent> and GenericType<org.carleton.cep.monitoring.latest.Events.qrsIntervalStreamEvent>
at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:217)
Now let's see how I tried to perform a merge. So basically I had two stream classes, their attributes are as follows
RRIntervalStreamEvent Stream
public Integer Sensor_id;
public Long time;
public Integer RRInterval;
qrsIntervalStreamEvent Stream
public Integer Sensor_id;
public Long time;
public Integer qrsInterval;
Both of these streams have generators classes which also sends the events at in same data types at the specified rate.Below is code by which I tried to merge them.
// getting qrs interval stream
DataStream<qrsIntervalStreamEvent> qrs_stream_raw = envrionment.
addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");
// getting RR interval stream
DataStream<RRIntervalStreamEvent> rr_stream_raw = envrionment.
addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");
//merging both streams
DataStream<Tuple3<Integer,Long,Integer>> mergedStream;
mergedStream = rr_stream_raw.union(new DataStream[]{qrs_stream_raw});
I have to use new DataStream[]
as just using qrs_stream_raw
was resulting in error as shown below.
Can someone please give me an idea about
- how should I merge these two streams?
- how should I merge more than two streams?
- is there some engine which can merge more than two streams having different structures, if yes which engine should I use