0
votes

I am running a wordcount topology in a Storm cluster composed on 2 nodes. One node is the Master node (with Nimbus, UI and Logviewer) and both of then are Supervisor with 1 Worker each. In other words, my Master node is also a Supervisor, and the second node is only a Supervisor. As I said, there is 1 Worker per Supervisor.

The topology I am using is configured so that it is using these 2 Workers (setNumWorkers(2)). In details, the topology has 1 Spout with 2 threads, 1 split Bolt and 1 count Bolt. When I deploy the topology with the default scheduler, the first Supervisor has 1 Spout thread and the split Bolt, and the second Supervisor has 1 Spout thread and the count Bolt.

Given this context, how can I control the placement of operators (Spout/Bolt) between these 2 Workers? For research purpose, I need to have some control over the placement of these operators between nodes. However, the mechanism seems to be transparent within Storm and such a control is not available for the end-user.

I hope my question is clear enough. Feel free to ask for additional details. I am aware that I may need to dig into Storm's source code and recompile. That's fine. I am looking for a starting point and advices on how to proceed.

The version of Storm I am using is 2.1.0.

1

1 Answers

1
votes

Scheduling is handled by a pluggable scheduler in Storm. See the documentation at http://storm.apache.org/releases/2.1.0/Storm-Scheduler.html.

You may want to look at the DefaultScheduler for reference https://github.com/apache/storm/blob/v2.1.0/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java. This is the default scheduler used by Storm, and has a bit of handling for banning "bad" workers from the assignment, but otherwise largely just does round robin assignment.

If you don't want to implement a cluster-wide scheduler, you might be able to set your cluster to use the ResourceAwareScheduler, and use a topology-level scheduling strategy instead. You would set this by setting config.setTopologyStrategy(YourStrategyHere.class) when you submit your topology. You will want to implement this interface https://github.com/apache/storm/blob/e909b3d604367e7c47c3bbf3ec8e7f6b672ff778/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java#L43 and you can find an example implementation at https://github.com/apache/storm/blob/c427119f24bc0b14f81706ab4ad03404aa85aede/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java

Edit: If you implement either an IStrategy or IScheduler, they need to go in a jar that you put in storm/lib on the Nimbus machine. The strategy or scheduler needs to be on the classpath of the Nimbus process.