I have created a Storm topology with a Spout that emits a number of tuples for benchmarking. I want to stop/kill my topology once all the tuples are emitted from the spout or there are no longer any tuples flowing in the topology.
Here is what my topology looks like.
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
//Disabled ACK'ing for higher throughput
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
LoadGeneratorSource loadGenerator = new LoadGeneratorSource(runtime,numberOfTuplesToBeEmitted);
builder.setSpout("loadGenerator", loadGenerator);
//Some Bolts Here
while (loadGenerator.isRunning()){
//Active Waiting
}
//DO SOME STUFF WITH JAVA
cluster.killTopology("StormBenchmarkTopology");
The problem is that the loadGenerator instance that I'm referring in this scope is different than the one running in the spout thread. Hence, isRuning() is always returning true, even though inside the spout thread its value is false when there are no more tuples to be emitted.
Here is a part of LoadGeneratorSource class.
public class LoadGeneratorSource extends BaseRichSpout {
private final int throughput;
private boolean running;
private final long runtime;
public LoadGeneratorSource(long runtime,int throughput) {
this.throughput = throughput;
this.runtime = runtime;
}
@Override
public void nextTuple() {
ThroughputStatistics.getInstance().pause(false);
long endTime = System.currentTimeMillis() + runtime;
while (running) {
long startTs = System.currentTimeMillis();
for (int i = 0; i < throughput; i++) {
try {
emitValue(readNextTuple());
} catch (Exception e) {
e.printStackTrace();
}
}
while (System.currentTimeMillis() < startTs + 1000) {
// active waiting
}
if (endTime <= System.currentTimeMillis())
setRunning(false);
}
}
public boolean isRunning() {
return running;
}
public void setRunning(boolean running) {
this.running = running;
}
//MORE STUFF
}
Can someone tell me a way to stop my topology once there are no more tuples emitted from the spout or flowing in the topology? Thanks for your help in advance.