0
votes

I have an stream application like this one:

DataStream<MyObject> stream1 = source
                .keyBy("clientip")
                .flatMap(new MyFlatMapFunction())
                .name("Stream1");

//...
public class MyFlatMapFunction extends RichFlatMapFunction<MyObject, MyObject> {

    private transient ValueState<Boolean> valueState;

    @Override
    public void open(Configuration parameters)
    {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(12))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupInBackground()
                .build();

        ValueStateDescriptor<Boolean> valueStateeDescriptor = new ValueStateDescriptor<>(
                "valueState",
                Types.BOOLEAN);

        valueStateeDescriptor.enableTimeToLive(ttlConfig);
        valueState = getRuntimeContext().getState(valueState);
    }

    @Override
    public void flatMap(MyObject myObject, Collector<MyObject> collector) throws Exception
    {
       // get value from value state, check if it is matched with something
       // if matches some condition, then collector.collect(myObject)
       // update state for each myObject
    }
}

Not: there are 3 workers on different 3 machines with 16 parallelism. Total parallelism is 48.

When I am implementing this code, I have always assumed that "if ip address 1.2.3.4 matches the condition, then subsequent request from the same ip address 1.2.3.4 always matches with the condition until the state is cleared". Is this statement correct?

What I know from flink docs, if ip address 1.2.3.4 goes to the machine1 (by generating hash value of clientip), then all requests from ip address 1.2.3.4 always goes to the machine1?

open()method is called once inside the taskmanager jvm. Therefore flink creates 48 instances of flatMapOperation(1-15 of 48 resides in machine1, 16-32 of 48 resides in machine2, 33-48 of 48 resides in machine3) and each flatMapInstance will run open method. That means 48 runs for open method?

At the end, all of 48 instances access the same state, but different values (because state is local). I mean, one portion instance group (let's say 16 instances on machine 1) will get the same state value.

Lastly, if there was no keyBy before FlatMap, then request from ip address 1.2.3.4 could go the machine1, machine2 or machine3 in a random manner?

1

1 Answers

1
votes
  1. Since you do a keyBy("clientip"), all records with the same value for that field will be processed by the same MyFlatMapFunction sub-task. So the set of all records is partitioned to the 48 sub-tasks, and assuming the counts of IP addresses are evenly distributed, each sub-task will get roughly 1/48 of all records.
  2. Yes, there will be 48 instances of MyFlatMapFunction instantiated, and thus 48 calls to open().
  3. all of 48 instances access the same state. No, the state is per-unique key, and thus state is partitioned among the 48 sub-tasks by key value.
  4. If there is no keyBy(), then each sub-task of the MyFlatMapFunction operator will get whatever data is in the partition from the source. That then depends on your data source, e.g. if you're reading from a Kafka topic, and that topic has 48 partitions, then there's a 1-to-1 mapping from Kafka partition to MyFlatMapFunction sub-task. If you have fewer than 48 Kafka partitions, some of your MyFlatMapFunction sub-tasks won't get any data. If you want to redistribute incoming records to all sub-tasks, then you can do a rebalance(). But note then that you won't be able to maintain per-IP address state.