0
votes

I know that keyed state belongs to the its key and only current key accesses its state value, other keys can not access to the different key's state value.

I tried to access the state with the same key but in different stream. Is it possible?

If it is not possible then I will have 2 duplicate data?

Not: I need two stream because each of them will have different timewindow and also different implementations.

Here is the example (I know that keyBy(sommething) is the same for both stream operations):

public class Sample{
       streamA
                .keyBy(something)
                .timeWindow(Time.seconds(4))
                .process(new CustomMyProcessFunction())
                .name("CustomMyProcessFunction")
                .print();

       streamA
                .keyBy(something)
                .timeWindow(Time.seconds(1))
                .process(new CustomMyAnotherProcessFunction())
                .name("CustomMyProcessFunction")
                .print();
}

public class CustomMyProcessFunction extends ProcessWindowFunction<..>
{
    private Logger logger = LoggerFactory.getLogger(CustomMyProcessFunction.class);
    private transient ValueState<SimpleEntity> simpleEntityValueState;
    private SimpleEntity simpleEntity;

    @Override
    public void open(Configuration parameters) throws Exception
    {
        ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
                "sample",
                TypeInformation.of(SimpleEntity.class)
        );
        simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
    }

    @Override
    public void process(...) throws Exception
    {
        SimpleEntity value = simpleEntityValueState.value();
        if (value == null)
        {
            SimpleEntity newVal = new SimpleEntity("sample");
            logger.info("New Value put");
            simpleEntityValueState.update(newVal);
        }
        ...
    }
...
}

public class CustomMyAnotherProcessFunction extends ProcessWindowFunction<..>
{


    private transient ValueState<SimpleEntity> simpleEntityValueState;

    @Override
    public void open(Configuration parameters) throws Exception
    {

        ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
                "sample",
                TypeInformation.of(SimpleEntity.class)
        );
        simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
    }

    @Override
    public void process(...) throws Exception
    {
        SimpleEntity value = simpleEntityValueState.value();
        if (value != null)
            logger.info(value.toString()); // I expect that SimpleEntity("sample")
        out.collect(...);
    }
...
}
3

3 Answers

1
votes

I tried with your idea to share state between two operators using same key.

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.IOException;

public class FlinkReuseState {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        DataStream<Integer> stream1 = env.addSource(new SourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> sourceContext) throws Exception {
                int i = 0;
                while (true) {
                    sourceContext.collect(1);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        DataStream<Integer> stream2 = env.addSource(new SourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> sourceContext) throws Exception {
                while (true) {
                    sourceContext.collect(1);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });


        DataStream<Integer> windowedStream1 = stream1.keyBy(Integer::intValue)
                .timeWindow(Time.seconds(3))
                .process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
                    private ValueState<Integer> value;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<Integer>("value", Integer.class);
                        value = getRuntimeContext().getState(desc);
                    }

                    @Override
                    public void process(Integer integer, Context context, Iterable<Integer> iterable, Collector<Integer> collector) throws Exception {
                        iterable.forEach(x -> {
                            try {
                                if (value.value() == null) {
                                    value.update(1);
                                } else {
                                    value.update(value.value() + 1);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        });
                        collector.collect(value.value());
                    }
                });

        DataStream<String> windowedStream2 = stream2.keyBy(Integer::intValue)
                .timeWindow(Time.seconds(3))
                .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {

                    private ValueState<Integer> value;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<Integer>("value", Integer.class);
                        value = getRuntimeContext().getState(desc);
                    }

                    @Override
                    public void process(Integer s, Context context, Iterable<Integer> iterable, Collector<String> collector) throws Exception {
                        iterable.forEach(x -> {
                            try {
                                if (value.value() == null) {
                                    value.update(1);
                                } else {
                                    value.update(value.value() + 1);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        });
                        collector.collect(String.valueOf(value.value()));
                    }
                });

        windowedStream2.print();

        windowedStream1.print();

        env.execute();

    }
}

It doesn't work, each stream only update its own value state, the output is listed below.

3> 3
3> 3
3> 6
3> 6
3> 9
3> 9
3> 12
3> 12
3> 15
3> 15
3> 18
3> 18
3> 21
3> 21
3> 24
3> 24

keyed state

Based on the official docs, *Each keyed-state is logically bound to a unique composite of <parallel-operator-instance, key>, and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as <operator, key>*.

I think it is not possible to share state by giving same name to states in different operators.

Have u tried coprocess function? By doing so, you can also implement two proccess funcs for each stream, the only problem will be the timewindow then. Can you provide more details about your process logic?

1
votes

As has been pointed out already, state is always local to a single operator instance. It cannot be shared.

What you can do, however, is stream the state updates from the operator holding the state to other operators that need it. With side outputs you can create complex dataflows without needing to share state.

0
votes

Why cant you return the state as part of map operation and that stream can be used to connect to other stream