0
votes

I have a topology with 1 spout reading from 2 SQS queues and 5 bolts. After processing when i try to ack from second bolt it is not getting acked.

I'm running it in reliable mode and trying to ack in the last bolt. I get this message as if the messages are getting acked. But it is not getting deleted from the queue and the overwritten ack() methods are not getting called. It looks like it calls the default ack method in backtype.storm.task.OutputCollector instead of the overridden method in my spout.

8240 [Thread-24-conversionBolt] INFO  backtype.storm.daemon.task - Emitting: conversionBolt__ack_ack [-7578372739434961741 -8189877254603774958]

I have anchored message ID to the tuple in my SQS queue spout and emitting to first bolt.

collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());

I have ack() and fail() methods overridden in my queue spout.Default Visibility Timeout has been set to 30 seconds

Code snippet from my topology:

TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("firstQueueSpout",
                new SqsQueueSpout(StormConfigurations.getQueueURL()
                        + StormConfigurations.getFirstQueueName(), true),
                StormConfigurations.getAwsQueueSpoutThreads());

        builder.setSpout("secondQueueSpout",
                new SqsQueueSpout(StormConfigurations.getQueueURL()
                        + StormConfigurations.getSecondQueueName(),
                        true), StormConfigurations.getAwsQueueSpoutThreads());

        builder.setBolt("transformerBolt", new TransformerBolt(),
                StormConfigurations.getTranformerBoltThreads())
                .shuffleGrouping("firstQueueSpout")
                .shuffleGrouping("secondQueueSpout");

        builder.setBolt("conversionBolt", new ConversionBolt(),
                StormConfigurations.getTranformerBoltThreads())
                .shuffleGrouping("transformerBolt");

        // To dispatch it to the corresponding bolts based on packet type
        builder.setBolt("dispatchBolt", new DispatcherBolt(),
                StormConfigurations.getDispatcherBoltThreads())
                .shuffleGrouping("conversionBolt");

Code snippet from SQSQueueSpout(extends BaseRichSpout):

@Override
public void nextTuple() 
{
        if (queue.isEmpty()) {
            ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10));
            queue.addAll(receiveMessageResult.getMessages());
        }       
        Message message = queue.poll();
        if (message != null) 
        {
            try
            {
                JSONParser parser = new JSONParser();           
                JSONObject jsonObj = (JSONObject) parser.parse(message.getBody());
                //      ack(message.getReceiptHandle());
                if (reliable) {
                    collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
                } else {
                    // Delete it right away
                    sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));             
                    collector.emit(getStreamId(message), new Values(jsonObj.toString()));
                }
            }
            catch (ParseException e) 
            {
                LOG.error("SqsQueueSpout SQLException in SqsQueueSpout.nextTuple(): ", e);
            }
        } else {
            // Still empty, go to sleep.
            Utils.sleep(sleepTime);
        }
    }

    public String getStreamId(Message message) {
        return Utils.DEFAULT_STREAM_ID;
    }

    public int getSleepTime() {
        return sleepTime;
    }

    public void setSleepTime(int sleepTime) 
    {
        this.sleepTime = sleepTime;
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("......Inside ack in sqsQueueSpout..............."+msgId);
        // Only called in reliable mode.
        try {
            sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, (String) msgId));
        } catch (AmazonClientException ace) { }
    }

    @Override
    public void fail(Object msgId) {
        // Only called in reliable mode.
        try {
            sqs.changeMessageVisibilityAsync(
                    new ChangeMessageVisibilityRequest(queueUrl, (String) msgId, 0));
        } catch (AmazonClientException ace) { }
    }

    @Override
    public void close() {
        sqs.shutdown();
        ((AmazonSQSAsyncClient) sqs).getExecutorService().shutdownNow();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }

Code snipped from my first Bolt(extends BaseRichBolt):

public class TransformerBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    public static final Logger LOG = LoggerFactory.getLogger(TransformerBolt.class);
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String eventStr = input.getString(0);
//some code here to convert the json string to map
//Map datamap, long packetId being sent to next bolt
    this.collector.emit(input, new Values(dataMap,packetId));       
        } 
        catch (Exception e) {
            LOG.warn("Exception while converting AWS SQS to HashMap :{}", e);
        }    
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("dataMap", "packetId"));
    }
}

Code snippet from second Bolt:

public class ConversionBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) 
    {
        try{
            Map dataMap = (Map)input.getValue(0);
            Long packetId = (Long)input.getValue(1);

                //this ack is not working
                this.collector.ack(input);
        }catch(Exception e){
            this.collector.fail(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

Kindly let me know if you need more information. Somebody shed some light on why the overridden ack in my spout is not getting called(from my second bolt)...

1

1 Answers

0
votes

You must ack all incoming tuples in all bolts, ie, add collector.ack(input) to TransformerBolt.execute(Tuple input).

The log message you see is correct: your code calls collector.ack(...) and this call gets logged. A call to ack in your topology is not a call to Spout.ack(...): Each time a Spout emits a tuple with a message ID, this ID gets registered by the running ackers of your topology. Those ackers will get a message on each ack of a Bolt, collect those and notify the Spout if all acks of a tuple got received. If a Spout receives this message from an acker, it calls it's own ack(Object messageID) method.

See here for more details: https://storm.apache.org/documentation/Guaranteeing-message-processing.html