I'm quite new to reactive programming, but am trying to write some new code using Project Reactor's Flux/Mono APIs, in a Spring Boot application.
- I have a finite stream of data (with unique IDs) coming from some library.
- Each datum in the stream has a "parent" datum in the stream, which might have been emitted before, or is still to come.
- I need to transform this data before sending it to another system into objects that include the parent.
Imperatively I could write
List<Datum> data = library.getData()
Map<String, Datum> lookup = data.stream().collect(toMap(Datum::getId, Function.identity()));
List<OutDatum> outData = data.stream()
.map(d -> OutDatum.builder()
.id(d.getId)
...
.parent(lookup.get(datum.getParent()))
.build())
.collect(toList());
send(outData);
What I did as a first step
- Create a Flux from the data (emitting from the list for now, hoping that the library can later actually provide the data in a streaming fashion).
cache()
the flux so that it won't redo the work of fetching from the library.- Create another lookup Mono based off the data Flux that creates a map for lookups
- Use
map()
on the data Flux to transform the Datum to OutDatum, using the above lookup Mono to get parent data - Pass on the mapped data Flux to the WebClient to send
Flux<Datum> data = emitAsFlux(library::getData)
.cache();
Mono<Map<String, Datum>> lookup = data.collectMap(Datum::getId)
.cache();
send(data.map(d -> OutDatum.builder()
.id(d.getId)
...
.parent(lookup.block().get(d.getParent()))
.build())
I understand that blocking is not something that we like to see in reactive programming but due to the dependency on the map, it's necessary (from my understanding because of the cache() blocking multiple times is not detrimental, but that's something I would refactor out later if necessary).
My issue here is that it hangs. Because they're referring to the same original Flux, the lookup map cannot get built because the flux is not allowed to continue. There's a deadlock.
I tried creating a proxy Flux using share()
but it didn't seem to help.
- Is there a way to let the Map exhaust the stream while the employee Flux is still processing elements earlier in the stream?
- I'd love to know what a good pattern for implementing this reactively would be.
- I'd also like to know where I'm being stupid or appear to be lacking understanding.
FYI emitAsFlux
looks something like this:
private Flux<Datum> emitAsFlux(final Callable<List<Datum>> dataProvider) {
return Flux.create(emitter -> {
taskExecutor.execute(() -> {
try {
dataProvider.call()
.forEach(emitter::next);
emitter.complete();
} catch (Exception e) {
emitter.error(e);
}
});
});
}