3
votes

We have a fairly simple storm topology with one head ache.

One of our bolts can either find the data it is processing to be valid and every thing carries on down the stream as normal or it can find it to be invalid but fixable. In which case we need to send it for some additional processing.

We tried making this step part of the topology with a separate bolt and stream.

declarer.declareStream(NORMAL_STREAM, getStreamFields());
declarer.declareStream(ERROR_STREAM, getErrorStreamFields());

Followed by some thing like the following at the end of the execute method.

if(errorOutput != null) {
    collector.emit(ERROR_STREAM, input, errorOutput);
}
else {
    collector.emit(NORMAL_STREAM, input, output);
}

collector.ack(input);

This does work however it has a breaking effect of causing all of the tuples that do not go down this error path to fail and get re-sent by the spout endlessly.

I think this is because the error bolt can not send acks for messages it doesn't receive but the acker thing waits for all the bolts in a topology to ack before sending the ack back to the spout. At the very least taking out the error processing bolt causes every thing to get acked back to the spout correctly.

What is the best way to achieve some thing like this?

1

1 Answers

3
votes

It's possible that the error bolt is slower than you suspect, causing a backup on error_stream which, in turn, causes a backup into your first bolt and finally causing tuples to start timing out. When a tuple times out, it gets resent by the spout.

Try:

  1. Increasing the timeout config (topology.message.timeout.secs),
  2. Limiting the number of inflight tuples from the spout (topology.max.spout.pending) and/or
  3. Increasing the parallelism count for your bolts