2
votes

Apache Flink is distributes its operators on available, free slots on the JobManagers (Slaves). As stated in the documentation, there is the possibility to set the SlotSharingGroup for every operator contained in an execution. This means, that two operators can share the same slot, where they are executed later.

Unfortunately, this option only allows to share the same group but not to assign a streaming operation to a specific slot.

So my question is: What would be the best (or at least one) way to manually assign streaming operators to specific slots/workers in Apache Flink?

2
It would be helpful to explain why you want to assign an operator to a specific slot.kkrugler
For a research project, I want to measure performance for several configurations and operator assignments. I am not sure, how this information would support my question.moosehead42
Your question appears to be an "XY Problem" (ref perlmonks.org/index.pl?node_id=542341), thus I asked for more context. Short answer is no, Flink doesn't support specific operator placement, see stackoverflow.com/a/57327197/231762 for more details.kkrugler
Thanks for the information and the links. So a way of solving that would be to modify Flinks Scheduler with a custom SchedulingStrategy. I tried to get into that code but it is really hard, so I hoped there is maybe an easier way.moosehead42

2 Answers

1
votes

There is ongoing development work in this direction. In particular, see FLIP-56: Dynamic Slot Allocation. I don't know if this goes far enough to satisfy your goals, but at the very least the refactorings and extensions it brings should be helpful.

For more details, see FLINK-14187 and related issues.

1
votes

You could disable the chaining via (disableChaining()) and start a new chain to isolate it from others via (startNewChain()). You can play with Flink Plan Visualizer to see if your plan has isolated operators. These modifiers applied affter the operator. Example:

  .map(...).startNewChain().slotSharingGroup("exceptional")
  // or
  .filter(...).startNewChain().slotSharingGroup("default")

Why do you need to isolate it? Well... at the end of any chain flink does a checkpoint (if enabled) and checkpoint should be confirmed (persisted/serialized). Otherwise the system will rollback it and start the process again. For this Flink needs to be sure that it has enough slots beforehand. In your case enough exceptional slots. And if not, the whole stream will be inactive. Therefore you can NOT tell flink that for operator x you need to use only slot X and for operator Z only Y as for Flink is just a computer power which produces intermediate results for the checkpoint (or directly to the next operator).