I have an stream application like this one:
DataStream<MyObject> stream1 = source
.keyBy("clientip")
.flatMap(new MyFlatMapFunction())
.name("Stream1");
//...
public class MyFlatMapFunction extends RichFlatMapFunction<MyObject, MyObject> {
private transient ValueState<Boolean> valueState;
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(12))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupInBackground()
.build();
ValueStateDescriptor<Boolean> valueStateeDescriptor = new ValueStateDescriptor<>(
"valueState",
Types.BOOLEAN);
valueStateeDescriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(valueState);
}
@Override
public void flatMap(MyObject myObject, Collector<MyObject> collector) throws Exception
{
// get value from value state, check if it is matched with something
// if matches some condition, then collector.collect(myObject)
// update state for each myObject
}
}
Not: there are 3 workers on different 3 machines with 16 parallelism. Total parallelism is 48.
When I am implementing this code, I have always assumed that "if ip address 1.2.3.4 matches the condition, then subsequent request from the same ip address 1.2.3.4 always matches with the condition until the state is cleared". Is this statement correct?
What I know from flink docs, if ip address 1.2.3.4 goes to the machine1 (by generating hash value of clientip), then all requests from ip address 1.2.3.4 always goes to the machine1?
open()
method is called once inside the taskmanager jvm. Therefore flink creates 48 instances of flatMapOperation(1-15 of 48 resides in machine1, 16-32 of 48 resides in machine2, 33-48 of 48 resides in machine3) and each flatMapInstance will run open method. That means 48 runs for open method?
At the end, all of 48 instances access the same state, but different values (because state is local). I mean, one portion instance group (let's say 16 instances on machine 1) will get the same state value.
Lastly, if there was no keyBy before FlatMap, then request from ip address 1.2.3.4 could go the machine1, machine2 or machine3 in a random manner?