0
votes

I tried to create a small example in Trident. The goal was to see how tuples are replayed in Case of failures. Below is the topology definition

        Random rand = new Random();

        Config config = new Config();
        config.setDebug(true);
        config.setNumWorkers(1);

        TridentTopology topology = new TridentTopology();

        topology.newStream("spout", new RandomIntegerSpout())
                .map((MapFunction) tridentTuple -> {
                    if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
                            (rand.nextInt(2) == 1)) {
                        System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
                        throw new ReportedFailedException("Divisible by 50");
                    }
                    return new Values(tridentTuple.toArray());
                })
                .peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));

I use the RandomIntegerSpout from storm-starter which extends BaseRichSpout and just generates random numbers. I then apply a MapFunction that just draws a random number every 50 tuples and randomly fails the tuple.

The Problem is, I do not get any acks or fails.

I played around with the spout and ran it in debug mode, tried same sample output, tried it with standard storm bolts. The anchoring is working fine, it just does not get called by trident.

I reproduced this problem with LocalCluster and StormSubmitter, in v1.2.3 and v2.0.0.

Below is a screenshot of the Storm UI: enter image description here The bolts corresponding to the map ack and fail the tuple as expected, but this is are never propagated back to the spout.

I thought the trident mastercoord might expect some kind of persistence in a state to realize the topology is done, but replacing peek by some persistentAggregate did not help. I also ruled out a bug in map by doing the same with each.

Seeing the code is almost trivial by inspection I probably misunderstand something fundamental about Trident / Storm. Am I wrong to expect trident to call the spout's and ack method if a batch is done? I realized there is no fail method in IBatchSpout. how does Trident handle replaying of batches??

1

1 Answers

1
votes

Trident spouts don't ack or fail tuples at the individual tuple level. Instead, tuples are acked as a batch.

Trident spouts will often look something like this interface.

M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);

The idea is that Trident will manage keeping track of acks/fails of the batch tuples, and then if the batch fails, it will ask the spout for to repeat the batch, and if not, it simply won't.

Note how this is different from a standard Storm spout. With a normal spout, the framework basically tells the spout "Hey, emit something. Up to you what you emit.", and then the ack and fail methods are used to tell the spout whether it should emit a particular tuple again.

With Trident, the spout is instead told "Hey, (re)emit batch number x", and it is then up to the spout to know which tuples were in that batch. With this model there's no need for a fail method. Some Trident spouts will have an ack/succeed method though, to allow the spout to drop any state it may have related to a particular in-progress batch.

For wrapped IRichSpouts, there's some bridging code that wraps them into the Trident API. Basically, the wrapper calls nextTuple until it has a full batch, then it stores the ids in a cache. If the wrapper is asked to reemit a batch, it calls fail on the spout. Otherwise, it calls ack once the batch has succeeded.

I think the reason you're not seeing anything in Storm UI related to this, is that the IRichBolt isn't actually represented there. Instead it's wrapped, so the ack/fail calls are happening "under the hood" inside the spout-spout component. If you want to know for sure whether ack/fail is being called, try adding some logging to the ack/fail methods of your IRichSpout.