2
votes

I am having trouble understanding how operator state is distributed among parallel operator instances to get the desired result.

Here is simple example:

class MultiplyNumber implements FlatMapFunction<Integer, Integer> {

     // This is the operator state (this is statically defined for simplicity here in this example, but assume this can dynamic based on control stream)
     List<Integer> multipliers = Arrays.asList(2,3,4,5);

     void flatMap(Integer value, Collector<Integer> out){
           for(Integer multiplier: multipliers){
                  out.collect(mutiplier*value);
           }
     }
}

Assume we are running with parallelism = 2, which means we have 2 parallel operator (MultiplyNumber) instances ( Operator1 and Operator2)

If the operator state (i.e List of multiplier) is distributed among these instances, then say: Operator1 has 2,3 as multipliers, and Operator2 has 4,5 as multipliers.

Now, assume we have keyed integer streams coming as input. The key for all even numbers is "Even" and the key for all odd numbers is "Odd".

Flink will send all even numbers to Operator1 and all odd numbers to Operator2 ( or vice versa).

This means that all even numbers should be multiplied by 2 and 3, and all odd numbers should be multiplied by 4 and 5.

But this is not what I expect as outcome. I would expect all numbers to be multiplied by 2,3,4,5, which is the result if the parallelism was 1.

1

1 Answers

1
votes

First of all the multiplier in your example is not Flink's state. It is a local to each operator java object instance. If you want to use Flink's state I recommend reading through this section in docs.

In your case I think the most useful type of State would be the BroadcastState (link) introduced in flink 1.5, which was implemented specifically for a use-case you've described.