4
votes

I have a requirement to process batches of tuples with Storm. My last bolt has to wait until the topology receives the entire batch and only after that can do some processing. To avoid confusion - a batch for me is a set of N messages that comes in real-time and the term doesn't have to be connected with batch processing (Hadoop). Even 2 messages can be a batch.

Reading the Storm's documentation is it safe to say that Storm doesn't inherently support this kind of batch processing (batch = N messages in real-time)?

I know that we have Trident but I am not using it but I did test it a little. The batchSpout concept is indeed what I am looking for because you can make a batch using the collector and the spout will emit messages as a single batch. But Trident aside, what does the plain Storm offer?

How I approached this problem is by using tuple acknowledgment and some timeout hacks (maybe they are not hacks?). As a message broker I used RabbitMQ and I made a spout that takes messages from a queue and sends them downstream as tuples until there is no more messages in the queue. These tuples pass through a couple of stages (3-4 stages aka bolts) and when they reach the final bolt they stop there. I need to stop them (not emit anything) because, as I said, the last bolt needs to process the result of the entire batch and then emit only one tuple (final resulting tuple). So how does it know when it should process? I've made the spout responsible for signalization. When the spout doesn't have any tuples to emit it sleeps for 10 ms. So after it sleeps for, let's say, 1000 ms it goes into a READY state (it is ready to emit an END-OF-BATCH or TIME-OUT signal). But another condition needs to be met. I can't send the signal until I am sure that all tuples reached the final bolt. So I've used tuple acknowledgment to keep track of this. When tuple reaches the final bolt it gets acked. When all tuples get acked and when the spout times out the spout sends the signal and the final bolt is now happy and it can process the result of that batch of tuples.

So my question is to you, my dear Storm gurus, is this topology badly designed and does it look like some sort of a hack? Is there a better way to do this?

2
I'm not going to say it in the answer, but because of the distributed nature of Storm, you might want to consider aggregating tuples in some shared memory (Redis) where each tuple might have a value that will indicate it's state. Then by using STORMS TICK_TUPLE you can constantly check if all the tuples in a given batch where processed.Vor
Could it be done without the shared memory? Does Storm offer anything more?filip
does the batch size known ahead of time?Vor
The batch size isn't known in advance. Don't get me wrong but my topology is working, I just want to know if this is the cleanest way to process a batch of tuples without using some fancy stuff like Redis. One of the problems here is that I am using acknowledgments as a way for signalization instead using them for message guaranteeing purposes (although I could tweak that too maybe).filip
do you have a control over message format? I'm asking because if you do, you can simple publish your messages to a different queues (1 queue - 1 batch) then send a control meessage to your "main" queue with all the details about "batch" queue. ANd consume messages from there. Once complete, simply remove the queue or let Rabbit do this by using auto_deleteVor

2 Answers

1
votes

Storm also provides https://storm.incubator.apache.org/documentation/Transactional-topologies.html. Although they are deprecated in favor of Trident, they are implemented on top of standard storm bolts/spouts so there's no reason you couldn't continue to use them. You could also consider splitting your process into two topologies where the second one waits until there are N messages to process in RabbitMQ

0
votes

my implementation is to replace storm.trident.planner.processor.EachProcessor and create a new BatchBoundaryFunction interface that extends original Function, so i can expose EachProcessor's startBatch & finishBatch method, and can know exactly when a batch of tuples start and when the batch of tuples finished.

package storm.trident.operation;

import storm.trident.planner.ProcessorContext;

public interface BatchBoundaryFunction extends Function
{
    void startBatch(ProcessorContext processorContext, TridentCollector collector);

    void finishBatch(ProcessorContext processorContext, TridentCollector collector);
}



package storm.trident.planner.processor;

import java.util.List;
import java.util.Map;

import storm.trident.operation.BatchBoundaryFunction;
import storm.trident.operation.Function;
import storm.trident.operation.TridentOperationContext;
import storm.trident.planner.ProcessorContext;
import storm.trident.planner.TridentProcessor;
import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.TridentTuple.Factory;
import storm.trident.tuple.TridentTupleView.ProjectionFactory;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;

public class EachProcessor implements TridentProcessor
{
    Function _function;

    TridentContext _context;

    AppendCollector _collector;

    Fields _inputFields;

    ProjectionFactory _projection;

    public EachProcessor(Fields inputFields, Function function)
    {
        _function = function;
        _inputFields = inputFields;
    }

    @Override
    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext)
    {
        List<Factory> parents = tridentContext.getParentTupleFactories();
        if (parents.size() != 1)
        {
            throw new RuntimeException("Each operation can only have one parent");
        }
        _context = tridentContext;
        _collector = new AppendCollector(tridentContext);
        _projection = new ProjectionFactory(parents.get(0), _inputFields);
        _function.prepare(conf, new TridentOperationContext(context, _projection));
    }

    @Override
    public void cleanup()
    {
        _function.cleanup();
    }

    @Override
    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple)
    {
        _collector.setContext(processorContext, tuple);
        _function.execute(_projection.create(tuple), _collector);
    }

    @Override
    public void startBatch(ProcessorContext processorContext)
    {
        if (_function instanceof BatchBoundaryFunction)
        {
            ((BatchBoundaryFunction) _function).startBatch(processorContext, _collector);
        }
    }

    @Override
    public void finishBatch(ProcessorContext processorContext)
    {
        if (_function instanceof BatchBoundaryFunction)
        {
            ((BatchBoundaryFunction) _function).finishBatch(processorContext, _collector);
        }
    }

    @Override
    public Factory getOutputFactory()
    {
        return _collector.getOutputFactory();
    }
}