I have these two requests:
Flux<ProductID> getProductIds() {
return this.webClient.get()
.uri(PRODUCT_ID_URI)
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(ProductID.class);
}
Mono<Product> getProduct(String id) {
return this.itemServiceWebClient.get()
.uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
.build(id))
.accept(MediaType.APPLICATION_STREAM_JSON)
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}
And with these I want to do the following:
Flux<Product> getProducts() {
return Flux.create(sink -> this.gateway.getProductIds()
.doOnComplete(() -> {
log.info("PRODUCTS COMPLETE");
sink.complete();
})
.flatMap(productId -> this.getProduct(productId.getID()))
.subscribe(product -> {
log.info("NEW PRODUCT: " + product);
sink.next(product);
}));
}
When I call this I get the following output:
PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....
Ofcourse the stream is closing before the results are actually there because the the async mono mapping. How can I keep this non-blocking but also making sure that the results arrive before the on complete is called?
Flux<Product>
when given ids? Are you doing something specific with those ids, besides fetching the product information? I don't understand why you've chosen to use Flux.create in this case. – Brian ClozelgetProducts()
is perofrming.toStream()
because of that I cant do a.subscribe()
there. My goal is to have a fully reactive chain from getting the ids over getting the product for each id to setting the products in my view when the stream is completed. – MulgardgetProducts()
is a controller method. – Mulgard