0
votes

I'm trying to set an overall parallelism setting in Flink 1.8.3 in Java as per documentation:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

Aside from that, I have also setParallelism(2) calls on the sink and source.

I also can see in the flink UI that the environment setting is applied (long-running session cluster, job submitted via rest API or Flink UI):

flink job config

but when i have a look at the parallelism the individual stages are running in the Flink UI, they run all with parallelism 1 (aside from source and sink, which are running with the expected parallelism setting):

flink job parallelism UI

I already tried also setting the parallelism setting on the individual operators instead, but it did not change anything. the operators are normal flatmaps and filters.

What is not configured right here to have all operators respect the parallelism setting properly? Can't i assume that setting the environment level parallelism will automatically apply this to all operators? I.e. do I need to watch out for other stuff as well when setting parallelism setting?

2
if you launch the stream job using the parameter -p it will overwrite your default parallelism ./bin/flink run -p 10 ../examples/*WordCount-java*.jar. check also the flink-conf.yaml file that contains the parameter parallelism.default. If it is set to 1 this is probably the reason.Felipe
we start the jobs via FlinkUI/RestAPI (i edited the original question with that info now). nothing should overwrite there. according to the docs, environment settings overwrite default settings from the yaml file. so whatever we set there, it should not get applied, since i set StreamExecutionEnvironment parallelism.Wolli
I would test to submit a job from the cli and compare the results. Maybe there is something misleading you inside Flink UI. I would also test from the IDE to make sure that the program is applying the parallelism that you set.Felipe
i think using the CLI should be no different. but i tried specifying the parallelism via the API and then it gets correctly applied. (ref: ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/…) ... something is fishy here. i will dig deeper and report here what i can find.Wolli
nice that it worked. why don't you use the Flink version 1.11? maybe it is already fixed.Felipe

2 Answers

0
votes

I "fixed" it by not trying to change the parallelism setting from inside the flink job code, but by passing a parallelism setting when starting the Flink job. this is not only possible via the CLI, but also via Rest API and the Flink UI. Everything works for us now as expected.

0
votes

I also faced an similar issue recently, i had an problem with taskmanager.numberOfTaskSlots. Make sure that your taskmanager has enough slots available.