- We have 1 spout and 1 bolt on single node. Spout reads the data from RabbitMQ and emits it to the only bolt which writes data to Cassandra.
- Our data source generates 10000 messages per second and storm takes around 10 sec to process this, which is too slow for us.
- We tried increasing the parallelism of topology but that doesn't make any difference.
What is ideal no of messages that can be processed on a single node machine with 1 spout and 1 bolt? and what are the possible ways to increase the processing speed of storm topology?.
Update : This is the sample code, it doesent have code for RabbitMQ and cassandra, but gives same performance issue.
// Topology Class
public class SimpleTopology {
public static void main(String[] args) throws InterruptedException {
System.out.println("hiiiiiiiiiii");
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("SimpleSpout", new SimpleSpout());
topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 2).setNumTasks(4).shuffleGrouping("SimpleSpout");
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(2);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("SimpleTopology", config, topologyBuilder.createTopology());
Thread.sleep(2000);
}
}
// Simple Bolt
public class SimpleBolt implements IRichBolt{
private OutputCollector outputCollector;
public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
this.outputCollector = oc;
}
public void execute(Tuple tuple) {
this.outputCollector.ack(tuple);
}
public void cleanup() {
// TODO
}
public void declareOutputFields(OutputFieldsDeclarer ofd) {
// TODO
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
// Simple Spout
public class SimpleSpout implements IRichSpout{
private SpoutOutputCollector spoutOutputCollector;
private boolean completed = false;
private static int i = 0;
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
this.spoutOutputCollector = soc;
}
public void close() {
// Todo
}
public void activate() {
// Todo
}
public void deactivate() {
// Todo
}
public void nextTuple() {
if(!completed)
{
if(i < 100000)
{
String item = "Tag" + Integer.toString(i++);
System.out.println(item);
this.spoutOutputCollector.emit(new Values(item), item);
}
else
{
completed = true;
}
}
else
{
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
Logger.getLogger(SimpleSpout.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public void ack(Object o) {
System.out.println("\n\n OK : " + o);
}
public void fail(Object o) {
System.out.println("\n\n Fail : " + o);
}
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Update: Is it possible that with shuffle grouping same tuple will be processed more than once? configuration used (spouts = 4. bolts = 4), the problem now is, with increase in no of bolts the performance is decreasing.
Our data source generates 10000 messages per second.. are you saying this because of theif(i < 100000)statement inside thenextTuplemethod ? - user2720864