0
votes

This is a pretty basic question about connected keyed stream.

If I have two streams with related events that share same logical key, and these streams are being connected (logically joined using the key) and this is all running with parallelism > 1, then how does Flink guarantee that two events from different streams with same logic key end up in the same parallel operator instance?

Here is a made-up example about hospital's patient streams - temperature stream and heartbeat stream. We want to join these two stream's by patient's id using ConnectedStream and CoFlatMapFunction.

DataStream<PatientTemperature> temperatureStream = ..
DataStream<HeartbeatStream> heartbeatStream = ..

temperatureStream
   .keyBy(pt -> pt.getPatientId())
   .connect (heartBeatStream.keyBy(hbt -> hbt.getPatientId() )
   .flatMap (new RichCoFlatMapFunction() {

         ValueState<PatientTemperatureAndHeartBeat> state = ...

         public void flatMap1(PatientTemperature value, Collector<PatientTemperatureAndHeartBeat> out) {
                state.value().setTemperature(value);  
         }

      public void flatMap2(PatentHeartbeat value, Collector<PatientTemperatureAndHeartBeat> out) {

               PatientTemperatureAndHeartBeat temperatureAndHeartBeat = state.value()
               temperatureAndHeartBeat.setHeartBeat(value)
               out.collect(temperatureAndHeartBeat);

      }

      });

Assume this is running with parallelism = 3, with operator tasks A, B, C, and they are all running in different physical machines.

Flink will guarantee that all Temperature events for patient "JohnDoe" will end up in the same parallel operator instance. Say it ends up in Operator B.

But when Flink receives HeartBeat events for "JohnDoe", how does it know to send them to Operator B where the patient's Temperature events were getting sent. Unless both Temperature and HeartBeat event are sent to the same parallel instance operator, the join would not work.

The fact that both streams are using the same logical key ( i.e patient's id) is application-specific and Flink does not know about. These two connected streams could be using their own keys which are unrelated to each other.

1

1 Answers

0
votes

Of course, the choice of the keys is application-specific. However, Flink is aware of how to access the keys since you are providing key-selector functions (pt -> pt.getPatientId() and hbt -> hbt.getPatientId()). Flink ensures that the keys of both streams have the same type and applies the same hash function on both streams to determine where to send the record.

Hence, the same values of both streams are shipped to the same operator instance.