0
votes

We have a pipeline with operations, split into 2 workloads - Source -> Transform are in a first group and are CPU-intensive workloads, they are put into the same slot sharing group, lets say source. And Sink, RAM-intensive workload, as it uses Bulk upload and holds amount of data in memory. It's sent to sink slot sharing group.

Additionally, we have a different parallelism level of Source -> Transform workload and Sink workload as the first one is limited by source parallelism. So, for example, we have Source -> Transform parallelism of 50, meanwhile Sink parallelism equal to 78. And we have 8 TMs, each with 16 cores (and therefore slots).

In this case, the ideal slots allocation strategy for us seems to be allocating 6-7 slots on each TM for Source -> Transform, and the rest - for Sink leading CPU-RAM workloads to be roughly evenly distributed across all TMs.

So, I wonder whether there is some config setting which will tell to distribute slot sharing groups evenly ?

I only found cluster.evenly-spread-out-slots config parameter, but I'm not sure whether it actually evenly distributes slot sharing groups, not only slots - for example, I get TMs with 10 Source -> Transform tasks meanwhile I would expect 6 or 7.

So, the question is whether it is possible to tell Flink to dsitribute slot sharing groups evenly across cluster ? Or probably there is any other possibility to do it ?

Distribute a Flink operator evenly across taskmanagers seems a bit similar to my question, but I'm mostly asking about slot sharing groups distribution. This topic also contains only suggestion of using cluster.evenly-spread-out-slots but probably something has changed since then.

2

2 Answers

0
votes

I tried once to achieve this but the problem is that Flink does not give a feature to enable operator placement. The close that I could get was to use the .map(...).slotSharingGroup("name");. As the documentation about "Set slot sharing group" says:

Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").

someStream.filter(...).slotSharingGroup("name");

So, I defined different groups based on the number of tasks slots that I have, together with the parallelism.

0
votes

I was able to find a workaround to get the even distribution of slot sharing groups.

Starting from flink 1.9.2, even tasks distribution feature has been introduced, which can be turned on via cluster.evenly-spread-out-slots: true in the flink-conf.yaml: FLINK-12122 Spread out tasks evenly across all available registered TaskManagers. I tried to enable it and it didn't work. After digging a bit, I managed to find the developer's comment which stated that this feature works only in standalone mode as it requires resources to be preliminary pre-allocated - https://issues.apache.org/jira/browse/FLINK-12122?focusedCommentId=17013089&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17013089":

the feature only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling. Hence, when you are using the active Yarn mode and submit the first job, then there won't be any TMs registered. Consequently, Flink will allocate the first container, fill it up and then only allocate a new container. However, if you start Flink in standalone mode or after your first job finishes on Yarn there are still some TMs registered, then the next job would be spread out.

So, the idea is to start a detached yarn session with the increased idle containers timeout setting, first submit some short living fake job, which will simply acquires the required amount of resources from YARN and completes, and then start immediately the main pipeline which will be assigned to already allocated containers and in this case the cluster.evenly-spread-out-slots: true does the trick and distributes all slot sharing groups evenly.

So, to sum up, the following was done to get the evenly distributed slot sharing groups within the job:

  1. resourcemanager.taskmanager-timeout was increased to allow the main job be submitted before the container released for an idle task manager. I increased this to 1 minute and this was more then enough.
  2. started a yarn-session and submitted job dynamically to it.
  3. tweaked the main job to call first for a fake job which simply allocates the resources. In my case, this simple code does the trick before configuring the main pipeline:
val env = StreamExecutionEnvironment.getExecutionEnvironment

val job = env
    .fromElements(0)
    .map { x =>
        x * 2
    }
    .setParallelism(parallelismMax)
    .print()

val jobResult = env.execute("Resources pre-allocation job")
println(jobResult)

print("Done. Starting main job!")