I am trying to implement the guaranteed message processing but the ack or fail methods on the Spout are not being called.
I am passing the a message ID object with the spout. I am passing the tuple with each bolt and calling collector.ack(tuple) in each bolt.
Question The ack or fail is not being called and I cannot work out why?
Here is a shortened code sample.
Spout Code using BaseRichSpout
public void nextTuple() {
for( String usage : usageData ) {
.... further code ....
String msgID = UUID.randomUUID().toString()
+ System.currentTimeMillis();
Values value = new Values(splitUsage[0], splitUsage[1],
splitUsage[2], msgID);
outputCollector.emit(value, msgID);
}
}
@Override
public void ack(Object msgId) {
this.pendingTuples.remove(msgId);
LOG.info("Ack " + msgId);
}
@Override
public void fail(Object msgId) {
// Re-emit the tuple
LOG.info("Fail " + msgId);
this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
}
Bolt Code using BaseRichBolt
@Override
public void execute(Tuple inputTuple) {
this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
this.outputCollector.ack(inputTuple);
}
Final Bolt
@Override
public void execute(Tuple inputTuple) {
..... Simply reports does not emit .....
this.outputCollector.ack(inputTuple);
}