1
votes

I have a Spring Boot application which contains a complex reactive flow (it involves MongoDB and RabbitMQ operations). Most of the time it works, but...

Some of the methods return a Mono<Void>. This is a typical pattern, in multiple layers:

fun workflowStep(things: List<Thing>): Mono<Void> =
    Flux.fromIterable(things).flatMap { thing -> doSomethingTo(thing) }.collectList().then()

Let's say doSomethingTo() returns a Mono<Void> (it writes something to the database, sends a message etc). If I just replace it with Mono.empty() then everything works as expected, but otherwise it doesn't. More specifically the Mono never completes, it runs through all processing but misses the termination signal at the end. So the things are actually written in the database, messages are actually sent, etc.

To prove that the lack of termination is the problem, here is a hack that works:

val hackedDelayedMono = Mono.empty<Void>().delayElement(Duration.ofSeconds(1))
return Mono.first(
    workflowStep(things),
    hackedDelayedMono
)

The question is, what can I do with a Mono that never completes, to figure out what's going on? There is nowhere I could put a logging statement or a brakepoint, because:

  • there are no errors
  • there are no signals emitted

How could I check what the Mono is waiting for to be completed?

ps. I could not reproduce this behaviour outside the application, with simple Mono workflows.

2
returning a Mono#empty from a function will have the return type Mono<Void> which means, you need to show us whats in doSomething. You are most likely breaking the chain somewhere but you have not posted a working reproducible example so it is impossible to tell where you are breaking the chain.Toerktumlare
the delay on your "hackedDelayedMono" does not really do anything since delay only happens on elements not on signals. Since its an empty mono it will directly complete on subscribe.p.streef

2 Answers

2
votes

You can trace and log events in your stream by using the log() operator in your reactive stream. This is useful for gaining a better understanding about what events are occurring within your app.

Flux.fromIterable(things)
    .flatMap(thing -> doSomethingTo(thing))
    .log()
    .collectList()
    .then()

Chained inside a sequence, it peeks at every event of the Flux or Mono upstream of it (including onNext, onError, and onComplete as well as subscriptions, cancellations, and requests).

Reactor Reference Documentation - Logging a Sequence

The Reactor reference documentation also contains other helpful advice for debugging a reactive stream and can be found here: Debugging Reactor

0
votes

(We managed to fix the problem - it was not directly in the code I was working on, but for some reason my changes triggered it. I still don't understand the root cause, but higher up the chain we found a Mono.zip() zipping a Mono<Void>. Although this used to work before, it stopped working at some point. Why is a Mono<Void> even zippable, why don't we get a compiler error, and even worse, why does it work sometimes?)

To answer my own question here, the tool used for debugging was adding the following to all Monos in the chain, until it didn't produce any output:

mono.doOnEach { x ->
    logger.info("signal: ${x}")
}
.then(Mono.defer {
    logger.info("then()")
    Mono.empty<Void>()
})

I also experimented with the .log() - also fine tool, but maybe too detailed, and it is not very easy to understand which Mono produces which log messages - as these are logged with the dynamic scope, not the lexical scope, which the above method gives you unambiguously.