I'm following the example in the official doc of Flink to try to understand how connectedStreams
work. Here is the example: https://ci.apache.org/projects/flink/flink-docs-master/learn-flink/etl.html#connected-streams
public class StreamingJob {
private static final Logger LOG = LoggerFactory.getLogger(MyFlink.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env.fromElements("DROP", "IGNORE").keyBy(x -> x);
DataStream<String> streamOfWords = env.fromElements("Apache", "DROP", "Flink", "IGNORE").keyBy(x -> x);
control
.connect(datastreamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext().getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}
}
As my understanding, the first parameter control_value
of the method flatMap1
should be the elements of the control stream, and the first parameter data_value
of the method flatMap2
should be the elements of the streamOfWords
.
However, when I try to print them, I always get the empty values.
Here is my changes:
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
LOG.info("flatMap1111111: ", control_value);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
LOG.info("flatMap2222222: ", data_value);
}
After executing this job, in the log file ./log/flink-root-taskexecutor-0-localhost.localdomain.log
, I can see that
2020-07-25 02:40:30,152 INFO myflink.StreamingJob - flatMap1111111:
2020-07-25 02:40:30,153 INFO myflink.StreamingJob - flatMap1111111:
2020-07-25 02:40:30,174 INFO myflink.StreamingJob - flatMap2222222:
2020-07-25 02:40:30,174 INFO myflink.StreamingJob - flatMap2222222:
2020-07-25 02:40:30,174 INFO myflink.StreamingJob - flatMap2222222:
2020-07-25 02:40:30,174 INFO myflink.StreamingJob - flatMap2222222:
As you see, they are all empty.
Did I do something wrong or misunderstand how connectedStreams
work?
control.connect(streamOfWords).flatMap(new ControlFunction()).print(); env.execute("StreamingJob");
is still there. – Yves