I tried to create a small example in Trident. The goal was to see how tuples are replayed in Case of failures. Below is the topology definition
Random rand = new Random();
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(1);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", new RandomIntegerSpout())
.map((MapFunction) tridentTuple -> {
if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
(rand.nextInt(2) == 1)) {
System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
throw new ReportedFailedException("Divisible by 50");
}
return new Values(tridentTuple.toArray());
})
.peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));
I use the RandomIntegerSpout
from storm-starter which extends BaseRichSpout
and just generates random numbers. I then apply a MapFunction
that just draws a random number every 50 tuples and randomly fails the tuple.
The Problem is, I do not get any ack
s or fail
s.
I played around with the spout and ran it in debug mode, tried same sample output, tried it with standard storm bolts. The anchoring is working fine, it just does not get called by trident.
I reproduced this problem with LocalCluster and StormSubmitter, in v1.2.3 and v2.0.0.
Below is a screenshot of the Storm UI: The bolts corresponding to the map ack and fail the tuple as expected, but this is are never propagated back to the spout.
I thought the trident mastercoord might expect some kind of persistence in a state to realize the topology is done, but replacing peek by some persistentAggregate did not help. I also ruled out a bug in map
by doing the same with each
.
Seeing the code is almost trivial by inspection I probably misunderstand something fundamental about Trident / Storm. Am I wrong to expect trident to call the spout's and ack
method if a batch is done? I realized there is no fail
method in IBatchSpout
. how does Trident handle replaying of batches??