I have created a sample topology to test set the max spout spending property. It is a simple toplogy with 1 spout and a bolt. The spout emits 100000 tuples and the bolt acks after sleeping for a second. I have set the max spout spending property to 10. I assume this means that a spout will not emit any tuples if the non acked messages count is 10 for that spout. But when I run the topology, I can see the spout emitting 2160 messages and then waits. Is my understanding correct or am I missing some thing. I am using storm 0.9.5. Below is the code
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout(), 1);
builder.setBolt("bolt", new TestBolt(),1).shuffleGrouping("spout");
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(10);
try {
StormSubmitter.submitTopology("test", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
public class TestSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int count = 1;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("spoutData"));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
System.out.println(context.maxTopologyMessageTimeout());
}
@Override
public void nextTuple() {
if(count <= 100000) {
System.out.println("Emitting : " + count);
collector.emit(new Values(count++ + ""));
}
}
}
public class TestBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
System.out.println(input.getString(0));
Thread.sleep(1000);
collector.ack(input);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("Exception");
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}