0
votes

Here the code I am trying to execute. I am intentionally failing in the bolt. So that I can see failed messaged being replayed by storm. But looks like this is not happening.

public static class FastRandomSentenceSpout extends BaseRichSpout {
  SpoutOutputCollector _collector;
  Random _rand;
   private static final String[] CHOICES = {
       "marry had a little lamb whos fleese was white as snow",
       "and every where that marry went the lamb was sure to go"
   };

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      _collector = collector;
      _rand = ThreadLocalRandom.current();
    }

   @Override
   public void nextTuple() {
      String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
      _collector.emit(new Values(sentence), sentence);
   }

   @Override
   public void fail(Object id) {
      System.out.println("RAVI: the failedObjectId = "+id);
      _collector.emit(new Values(id), id);
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("sentence"));
   }
 }

Here is the details about Split sentence Bolt. Where I intentionally fail.

 public static class SplitSentence extends BaseRichBolt 
 {
     OutputCollector _collector;
     @Override
     public void prepare(Map conf,
                     TopologyContext context,
                     OutputCollector collector)
    {
       _collector = collector;
    }

This is the function where failing is happening

    @Override
    public void execute(Tuple tuple) 
    {
        String sentence = tuple.getString(0);
        System.out.println("sentence = "+sentence);
        if(sentence.equals("marry had a little lamb whos fleese was white as snow"))
        {
           System.out.println("going to fail");
           _collector.fail(tuple);
        }
        else
        { 
           for (String word: sentence.split("\\s+")) {
              _collector.emit(tuple, new Values(word, 1));
           }
           _collector.ack(tuple);
        }   
     }

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word", "count"));
     }
  }

This the driving code details. public static void main(String[] args) throws Exception {

   TopologyBuilder builder = new TopologyBuilder();

   builder.setSpout("spout", new FastRandomSentenceSpout(), 4);

   builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");


   Config conf = new Config();
   conf.registerMetricsConsumer(
             org.apache.storm.metric.LoggingMetricsConsumer.class);


   String name = "wc-test";
   if (args != null && args.length > 0) {
       name = args[0];
   }

   conf.setNumWorkers(1);
   StormSubmitter.submitTopologyWithProgressBar(name, 
                                                conf,
                                                builder.createTopology());

  }
1
Is there a global cluster level setting which overrides the conf.setNumAckers() ? I tried explicitly setting conf.setNumAckers(1); But still the same result. - shanker861

1 Answers

0
votes

turns out it was due to global setting mentioned in storm.yaml . The specific setting was

topology.acker.executors: 0