I have an infinite hot flux of data. I am about to engage in carrying out an operation on each element in the stream, each of which returns a Mono which will complete (one way or another) after some finite time.
There is the possibility of an error being thrown from these operations. If so, I want to resubscribe to the hot flux without missing anything, retrying elements that were in the middle of being processed when the error was thrown (i.e. anything that did not complete successfully).
What do I do here? I can tolerate repeated operations on the same elements, but not losing elements entirely from the stream.
I've attempted to use a ReplayProcessor to handle this, but I can't see a way of making it work without repeating a lot of operations that might well have succeeded (using a very conservative timeout), or losing elements due to new elements overriding old ones in the buffer (as below).
Test case:
@Test
public void fluxTest() {
List<String> strings = new ArrayList<>();
strings.add("one");
strings.add("two");
strings.add("three");
strings.add("four");
ConnectableFlux<String> flux = Flux.fromIterable(strings).publish();
//Goes boom after three uses of its method, otherwise
//returns a mono. completing after a little time
DangerousClass dangerousClass = new DangerousClass(3);
ReplayProcessor<String> replay = ReplayProcessor.create(3);
flux.subscribe(replay);
replay.flatMap(dangerousClass::doThis)
.retry(1)
.doOnNext(s -> LOG.info("Completed {}", s))
.subscribe();
flux.connect();
flux.blockLast();
}
public class DangerousClass {
Logger LOG = LoggerFactory.getLogger(DangerousClass.class);
private int boomCount;
private AtomicInteger count;
public DangerousClass(int boomCount) {
this.boomCount = boomCount;
this.count = new AtomicInteger(0);
}
public Mono<String> doThis(String s) {
return Mono.fromSupplier(() -> {
LOG.info("doing dangerous {}", s);
if (count.getAndIncrement() == boomCount) {
LOG.error("Throwing exception from {}", s);
throw new RuntimeException("Boom!");
}
return s;
}).delayElement(Duration.ofMillis(600));
}
}
This prints:
doing dangerous one
doing dangerous two
doing dangerous three
doing dangerous four
Throwing exception from four
doing dangerous two
doing dangerous three
doing dangerous four
Completed four
Completed two
Completed three
One is never completed.