I tried to create a counter and count how many times I received the tuple in the bolt. But after doing this, :
public class CounterBolt extends BaseRichBolt {
OutputCollector outputCollector;
int count;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
outputCollector = collector;
}
@Override
public void execute(Tuple tuple) {
if (tuple.getSourceStreamId().equals("GotResult")) {
count++;
} else
System.out.println(count); //check count
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
I realise that count
will become 0 because of new instance of CounterBolt
is created every time in Storm topology.
An approach that I could think of is using an external storage to do this maybe using a database or message broker like Redis to store the counter. My Java knowledge is still not advanced level so is there any proper way to do this?