HI: I will have a large capacity storm analysis task. For me, I want to spin off many bolt/workers across different nodes/machines to take the task so that every machine could share the load . I am wondering how to write bolt/workers/topology so that they could communicate with each other. In the below codes, I submit topology in one machine, how to write bolt/worker/config in other machines so that topology is aware of other machines' bolt/worker. I suppose I could not submit topology in one machine and submit same topology in other machines. Any hints on storm worker load sharing?
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class StormClusterMain {
private static final String SPOUTNAME="KafkaSpout";
private static final String ANALYSISBOLT = "ClusterAnalysisWorker";
private static final String CLIENTID = "ClusterStorm";
private static final String TOPOLOGYNAME = "ClusterTopology";
private static class AppAnalysisBolt extends BaseRichBolt {
private static final long serialVersionUID = -6885792881303198646L;
private static final String collectionName="clusterusers";
private OutputCollector _collector;
private AtomicInteger index = new AtomicInteger(0);
private static final Logger boltLogger = LoggerFactory.getLogger(AppAnalysisBolt.class);
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
boltLogger.error("Message received:"+tuple.getString(0));
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException{
String zookeepers = null;
String topicName = null;
if(args.length == 2 ){
zookeepers = args[0];
topicName = args[1];
}else{
System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
System.out.println("Usage :.xxx");
System.exit(-1);
}
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
topicName,
"",// zookeeper root path for offset storing
CLIENTID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUTNAME, kafkaSpout, 1);
builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt())
.shuffleGrouping(SPOUTNAME);
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGYNAME, conf, builder.createTopology());