0
votes

I tried to create a counter and count how many times I received the tuple in the bolt. But after doing this, :

public class CounterBolt extends BaseRichBolt {
    OutputCollector outputCollector;
    int count;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        outputCollector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        if (tuple.getSourceStreamId().equals("GotResult")) {
            count++;
        } else
            System.out.println(count); //check count
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}

I realise that count will become 0 because of new instance of CounterBolt is created every time in Storm topology.

An approach that I could think of is using an external storage to do this maybe using a database or message broker like Redis to store the counter. My Java knowledge is still not advanced level so is there any proper way to do this?