I want to first manipulate static data using dataset API and then use DataStream API to run a streaming job. If I write code on IDE, it works perfectly. But when I try running on local flink jobmanager (all parallelism 1), the streaming code never executes!
For example, the following code is not working:
val parallelism = 1
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)
val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head
val theStream = env.fromElements(1).iterate( iteretion => {
val result = iteretion.map(x => x + myVal)
(result, result)
})
theStream.print()
env.execute("static and streaming together")
What should I try to get this thing working?
Logs:execution logs for above program
Execution plan: plan Seems to be a-cyclic.