We have a fairly simple storm topology with one head ache.
One of our bolts can either find the data it is processing to be valid and every thing carries on down the stream as normal or it can find it to be invalid but fixable. In which case we need to send it for some additional processing.
We tried making this step part of the topology with a separate bolt and stream.
declarer.declareStream(NORMAL_STREAM, getStreamFields());
declarer.declareStream(ERROR_STREAM, getErrorStreamFields());
Followed by some thing like the following at the end of the execute method.
if(errorOutput != null) {
collector.emit(ERROR_STREAM, input, errorOutput);
}
else {
collector.emit(NORMAL_STREAM, input, output);
}
collector.ack(input);
This does work however it has a breaking effect of causing all of the tuples that do not go down this error path to fail and get re-sent by the spout endlessly.
I think this is because the error bolt can not send acks for messages it doesn't receive but the acker thing waits for all the bolts in a topology to ack before sending the ack back to the spout. At the very least taking out the error processing bolt causes every thing to get acked back to the spout correctly.
What is the best way to achieve some thing like this?