0
votes

I have a 2 data streams (Eg)

ts | device | custId | temp
1 | 'device1'| 1 | 10
1 | 'device2'| 4 | 7
2 | 'device1'| 1 | 10
3 | 'device1'| 1 | 10
4 | 'device1'| 1 | 10
5 | 'device2'| 4 | 10

I have created an CEP pattern where I want to check if within 4 seconds the temperature is greater than 30.

val pattern = Pattern.begin[Device]("start")
      .where(_.sumtemp >= 30)
      .within(Time.seconds(4))

Is there a way to join the output of this pattern stream to another incoming data stream to get below?.

ts | custId | morethanthiry
1 | 1 | yes
2 | 4 | no

I would be really grateful if an example could be shared to do this.

1

1 Answers

0
votes

There are more than one option. You could join your streams with a coGroup

Example:

set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());

You can think of it as a join in SQL.

A small example for an implementation:

class MyCoGroupFunction extends RichCoGroupFunction[DataTypeOfStream1, DataTypeOfStream2, DataTypeOfOutput] {

      override def coGroup(first: DataTypeOfStream1,
                         second: DataTypeOfStream2],
                         out: DataTypeOfOutput): Unit = {

           out.collect(...)
           //your output

      }
}

You could also use a state, if needed.

There are also other options to join two streams like

  • union (if the to be connected streams have the same data type)
  • connect
  • coFlatMap The differences between the methods are minor imho.

See https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/ for more Information.