1
votes

I have these two requests:

Flux<ProductID> getProductIds() {
    return this.webClient.get()
            .uri(PRODUCT_ID_URI)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .retrieve()
            .bodyToFlux(ProductID.class);
}

Mono<Product> getProduct(String id) {
    return this.itemServiceWebClient.get()
            .uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
                    .build(id))
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .exchange()
            .flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}

And with these I want to do the following:

Flux<Product> getProducts() {
    return Flux.create(sink -> this.gateway.getProductIds()
            .doOnComplete(() -> {
                log.info("PRODUCTS COMPLETE");

                sink.complete();
            })
            .flatMap(productId -> this.getProduct(productId.getID()))
            .subscribe(product -> {
                log.info("NEW PRODUCT: " + product);

                sink.next(product);
            }));
}

When I call this I get the following output:

PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....

Ofcourse the stream is closing before the results are actually there because the the async mono mapping. How can I keep this non-blocking but also making sure that the results arrive before the on complete is called?

1
Could you explain what your goal is? Do you want to have a Flux<Product> when given ids? Are you doing something specific with those ids, besides fetching the product information? I don't understand why you've chosen to use Flux.create in this case.Brian Clozel
What else schould I do? The code where I call getProducts() is perofrming .toStream() because of that I cant do a .subscribe() there. My goal is to have a fully reactive chain from getting the ids over getting the product for each id to setting the products in my view when the stream is completed.Mulgard
In your view? So this is a controller method?Brian Clozel
Yes - getProducts() is a controller method.Mulgard

1 Answers

2
votes

Assuming getProducts is a controller method and that you want to add those products in your model to be rendered in a view template, you could solve this problem like this:

@GetMapping("/products")
public String getProducts(Model model) {

    Flux<Product> products = this.gateway.getProductIds()
            .flatMap(productId -> this.getProduct(productId.getID()));
    model.put("products", products);
    // return the view name
    return "showProducts";
}