Suppose I have two Task Managers, and each one has just one task slot. Now, I have the following job:
KeyedStream<...> streamA = env.addSource(...).keyBy(...);
KeyedStream<...> streamB = env.addSource(...).keyBy(...);
streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);
One Task Manager will consume data from a Kafka topic, and the other one will consume data from another Kafka topic.
I send the job to the Job Manager to execute it. Flink allocates both the Task Managers to process the flatMap (since a Task Manager has just one task slot).
The flatMap makes a simple join between the events (using two keyed-states):
public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
private ValueState<A> AState;
private ValueState<B> BState;
@Override
public void open(Configuration config) {
AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
}
@Override
public void flatMap1(A event, Collector<String> out) throws Exception {
B secondEvent = BState.value();
if (secondEvent == null)
AState.update(event);
else {
out.collect(...);
BState.clear();
}
}
@Override
public void flatMap2(A event, Collector<String> out) throws Exception {
A firstEvent = AState.value();
if (firstEvent == null)
BState.update(event);
else {
out.collect(...);
AState.clear();
}
}
}
If I have understood right, after the connect method, the stream becomes only one. Now, the implemented flatMap needs to share the state, since the operator must control if the corrispective event is arrived to apply the join, but it is executed with a paralellism of two, so using both the Task Managers. This means that a Task Manager should save inside the state of the other Task Manager (that's shared after the connect method) each time a state must be updated, or it may need to simply read the state. How the Task Managers communicate then? Does it affect the performance, since Task Managers may run on different cluster nodes?
EDIT: I have found the following article on the Flink's blog, and it seems that two Task Managers can communicate through a TCP connection, which makes sense to me, since there are some cases in which we need to share states between events. If this is wrong, can you please explain to me how Flink manages the following scenario?
Suppose always there are two Task Managers, physically located on two cluster nodes. Each Task Manager have always just one slot. I run the above job and set a parallelism of 2 (using, for example, the -p parameter while sending the job to the Job Manager). Now, Flink will create two subtasks from my job, that are structurally the same, and sends them to the Task Managers. Both the Task Managers will execute the "same" job, but consuming different events. The job consume events from two Kafka topics: A and B. This means the first and the second Task Manager will consume both from topic A and B, but different events, otherwise there would be duplicates. The job is the same, i.e. it executes the above RichCoFlatMapFunction, then each Task Manager will proceed locally with its set of consumed events and personal local states. Now comes the problem: suppose that the first Task Manager has consumed an event having the key "1". This event arrives inside the RichCoFlatMapFunction, and it is stored inside the state, since the operator is still waiting another event having the same key to produce the join. If the other event having a key of "1" is consumed from the second Task Manager, and they don't share their state or communicate, it will be impossible to make the join. What is wrong in my reasoning?