0
votes

Suppose I have two Task Managers, and each one has just one task slot. Now, I have the following job:

    KeyedStream<...> streamA = env.addSource(...).keyBy(...);
    KeyedStream<...> streamB = env.addSource(...).keyBy(...);

    streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);

One Task Manager will consume data from a Kafka topic, and the other one will consume data from another Kafka topic.

I send the job to the Job Manager to execute it. Flink allocates both the Task Managers to process the flatMap (since a Task Manager has just one task slot).

The flatMap makes a simple join between the events (using two keyed-states):

    public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
        private ValueState<A> AState;
        private ValueState<B> BState;

        @Override
        public void open(Configuration config) {
            AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
            BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
        }

        @Override
        public void flatMap1(A event, Collector<String> out) throws Exception {
            B secondEvent = BState.value();

            if (secondEvent == null)
                AState.update(event);
            else {
                out.collect(...);
                BState.clear();
            }
        }

        @Override
        public void flatMap2(A event, Collector<String> out) throws Exception {
            A firstEvent = AState.value();

            if (firstEvent == null)
                BState.update(event);
            else {
                out.collect(...);
                AState.clear();
            }
        }
    }

If I have understood right, after the connect method, the stream becomes only one. Now, the implemented flatMap needs to share the state, since the operator must control if the corrispective event is arrived to apply the join, but it is executed with a paralellism of two, so using both the Task Managers. This means that a Task Manager should save inside the state of the other Task Manager (that's shared after the connect method) each time a state must be updated, or it may need to simply read the state. How the Task Managers communicate then? Does it affect the performance, since Task Managers may run on different cluster nodes?

EDIT: I have found the following article on the Flink's blog, and it seems that two Task Managers can communicate through a TCP connection, which makes sense to me, since there are some cases in which we need to share states between events. If this is wrong, can you please explain to me how Flink manages the following scenario?

Suppose always there are two Task Managers, physically located on two cluster nodes. Each Task Manager have always just one slot. I run the above job and set a parallelism of 2 (using, for example, the -p parameter while sending the job to the Job Manager). Now, Flink will create two subtasks from my job, that are structurally the same, and sends them to the Task Managers. Both the Task Managers will execute the "same" job, but consuming different events. The job consume events from two Kafka topics: A and B. This means the first and the second Task Manager will consume both from topic A and B, but different events, otherwise there would be duplicates. The job is the same, i.e. it executes the above RichCoFlatMapFunction, then each Task Manager will proceed locally with its set of consumed events and personal local states. Now comes the problem: suppose that the first Task Manager has consumed an event having the key "1". This event arrives inside the RichCoFlatMapFunction, and it is stored inside the state, since the operator is still waiting another event having the same key to produce the join. If the other event having a key of "1" is consumed from the second Task Manager, and they don't share their state or communicate, it will be impossible to make the join. What is wrong in my reasoning?

1

1 Answers

2
votes

There is no need for the two task managers to communicate for the purpose of state sharing -- there is no state sharing in Flink.

Any of these three execution graphs shown below are possible, depending on the details of how you arrange the sources. At the left of each figure we see source operators for A and B, and at the right, the two parallel instances of a two-input operator implementing a join via a RichCoFlatMap.

enter image description here

The keyBy isn't an operator, but instead specifies how the sources and the two RichCoFlatMap instances are connected. It arranges for this to be a hashed connection that does a repartitioning of the source streams.

It doesn't much matter which of these three scenarios is employed, because in all three cases, the keyBy will have the same effect of steering all events for some keys to Join1, and all events for the other keys to Join2.

In other words, for any given key, all of the events for that key will be processed in the same task slot. You can think of ValueState<A> as a distributed (sharded) key/value store, where the values are of type A. Each task manager has the state for a slice of that key/value store (for a disjoint subset of the keys), and handles all of the events for those keys (and only those keys).

For example: In flatMap1, when BState.value() is called with an element from streamA, the Flink runtime will access the value of BState for the key that is currently in context, meaning the value associated with the key for the event from streamA currently being processed. This state will always be local, in the current task. Similarly, flatMap2 will always be called with elements from streamB.

This design avoids any coupling between the task managers, which is good for scalability and performance.