0
votes

I am using flink 1.3, I have defined two stream sources that will emit out same events to be processed by subsequent operators(my defined process operator and sink operator)

But it looks that in the source-process-pink pipeline, I could only specify one source, I would ask how to specify two or more sources and do the same process and sink

object FlinkApplication {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.addSource(new MySource1()) //How to MySource2 here?
      .setParallelism(1)
      .name("source1")
      .process(new MyProcess())
      .setParallelism(4)
      .addSink(new MySink())
      .setParallelism(2)
    env.execute("FlinkApplication")
  }

}
1

1 Answers

2
votes

The API provides a lot of flexibility regarding how you can set up your processing pipelines. You can do this if you want to apply the same logic to multiple sources:

env.addSource(new MySource1())
  .process(new MyProcess())
  .addSink(new MySink())

env.addSource(new MySource2())
  .process(new MyProcess())
  .addSink(new MySink())

env.execute()

Or if it makes more sense to do so, you could union the two streams and then process the combined stream (or some combination of these approaches):

stream1.union(stream2)
  .process(...)
  .addSink(...)

It's also possible to do things the other way around if you want to fork the stream and apply different operations to each copy:

val stream: DataStream[T] = env.addSource(new MySource())

stream.process(new MyProcess1())
  .addSink(new MySink1())

stream.process(new MyProcess2())
  .addSink(new MySink2())

env.execute()

And wow, Flink 1.3 is more than three years old!