10
votes

Consider a Flink cluster with some nodes where each node has a multi-core processor. If we configure the number of the slots based on the number of cores and equal share of memory, how does Apache Flink distribute the tasks between the nodes and the free slots? Are they fairly treated?
Is there any way to make/configure Flink to treat the slots equally when we configure the task slots based on the number of the cores available on a node
     For instance, assume that we partition the data equally and run the same task over the partitions. Flink uses all the slots from some nodes and at the same time some nodes are totally free. The node which has less number of CPU cores involved outputs the result much faster than the node with more number of CPU cores involved in the process. Apart from that, this ratio of speedup is not proportional to the number of used cores in each node. In other words, if in one node one core is occupied and in another node two cores are occupied, in fairly treating each core as a slot, each slot should output the result over the same task in almost equal amount of time irrespective of which node they belong to. But, this is not the case here.
     With this assumption, I would say that the nodes are not treated equally. This in turn produces a result time wise that is not proportional to the number of the nodes available. We can not say that increasing the number of the slots necessarily decreases the time cost.

I would appreciate any comment from the Apache Flink Community!!

2
I think you should try asking in the flink community ([email protected]) -- see flink.apache.org/community.html#mailing-listsLeo
Thanks Leo,It seems I should follow your recommendation!Ahmad.S
@A.Samiei I was wondering whether you got the response from Apache community and could you share it here?Rakesh

2 Answers

2
votes

Flink's default strategy as of version >= 1.5 considers every slot to be resource-wise the same. With this assumption, it should not matter wrt resources where you place the tasks since all slots should be the same. Given this, the main objective for placing tasks is to colocate them with their inputs in order to minimize network I/O.

If we are now in a standalone setup where we have a fixed number of TaskManagers running, Flink will pick slots in an arbitrary fashion (no guarantee given) for the sources and then colocate their consumers in the same slots if possible.

When running Flink on Yarn or Mesos where Flink can start new TaskManagers, Flink will first use up all slots of an existing TaskManager before it requests a new one. In this case, you will see that all sources will end up on as few TaskManagers as possible.

Since CPUs are not isolated wrt slots (they are a shared resource), the above-mentioned assumption does not hold true in all cases. Hence, in some cases where you have a fixed set of TaskManagers it is actually beneficial to spread the tasks out as much as possible to make use of the shared CPU resources.

In order to support this kind of scheduling strategy, the Flink community added the task spread out strategy via FLINK-12122. In order to use a scheduling strategy which is more similar to the pre FLIP-6 behaviour where Flink tries to spread out the workload across all available TaskExecutors, one needs to set cluster.evenly-spread-out-slots: true in the flink-conf.yaml

1
votes

Very old thread, but there is a newer thread that answers this question for current versions.

with Flink 1.5 we added resource elasticity. This means that Flink is now able to allocate new containers on a cluster management framework like Yarn or Mesos. Due to these changes (which also apply to the standalone mode), Flink no longer reasons about a fixed set of TaskManagers because if needed it will start new containers (does not work in standalone mode). Therefore, it is hard for the system to make any decisions about spreading slots belonging to a single job out across multiple TMs. It gets even harder when you consider that some jobs like yours might benefit from such a strategy whereas others would benefit from co-locating its slots. It gets even more complicated if you want to do scheduling wrt to multiple jobs which the system does not have full knowledge about because they are submitted sequentially. Therefore, Flink currently assumes that slots requests can be fulfilled by any TaskManager.