2
votes

I want to first manipulate static data using dataset API and then use DataStream API to run a streaming job. If I write code on IDE, it works perfectly. But when I try running on local flink jobmanager (all parallelism 1), the streaming code never executes!

For example, the following code is not working:

val parallelism = 1

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)

val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)

val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head

val theStream = env.fromElements(1).iterate( iteretion => {
  val result = iteretion.map(x => x + myVal)
  (result, result)
})
theStream.print()
env.execute("static and streaming together")

What should I try to get this thing working?

Logs:execution logs for above program

Execution plan: plan Seems to be a-cyclic.

1
What are the logs saying?Till Rohrmann
@TillRohrmann Link added.Vishal Goel
What does the client log say?Till Rohrmann
@TillRohrmann ran on local job manager. *.out file is empty. The job directly finishes.Vishal Goel
@TillRohrmann the execution plan created by job manager seems to a-cyclic! linkVishal Goel

1 Answers

4
votes

If you have a Flink job which consists of multiple sub jobs, e.g. triggered by count, collect or print, then you cannot submit the job via the web interface. The web interface only supports a single Flink job.