2
votes

I am new to webflux and am not able to find the right material to continue with the implementation.

I want to issue a request and process the response asynchronously. In this case service call takes about 8-10 ms to respond, so we issue the request and continue doing other work, and look for the response when it is needed for further processing.

Mono<Map<String,Price>> resp = webClient.post()
.uri("/{type}",isCustomerPricing ? "customer" : "profile")
.body(Mono.just(priceDetailsRequest),PriceDetailsRequest.class)
.retrieve().bodyToMono(customerPriceDetailsType);

How do we make this call execute asynchronously on a different thread.(I tried subscriberOn with Schedulers.single/ Scheuldes.parallel), but didn't see the call getting executed until Mono.block() is called.

How do we achieve ?

  1. We want this call execute in parallel on a separate thread, so the current thread can continue with other work
  2. When processing completes, set response to context
  3. When the current thread looks for the response, if the service has not completed, block until the call completes
1
Was able to get it publish asynchronously and get response by doing this - .subscribeOn(Schedulers.elastic()).subscribe(x -> LOGGER.info("Received Response"+x.getClass())). Is this the right way ? when the response needs to be used, if it is not available, how can I have the current thread block (Much like Future.get())basu76
It already automatically waits, but in a non-blocking way, in that the event-loop is released until the response is returned.Rajesh J Advani

1 Answers

2
votes

You don't need to block for consuming the response. Just assign an operator to consume the response in the same chain. An example is given below.

Mono<Map<String,Price>> resp = webClient.post()
        .uri("/{type}",isCustomerPricing ? "customer" : "profile")
        .body(Mono.just(priceDetailsRequest),PriceDetailsRequest.class)
        .retrieve()
        .bodyToMono(CustomerPriceDetailsType.class)
        .map(processor::responseToDatabaseEntity) // Create a persistable entity from the response
        .map(priceRepository::save)               // Save the entity to the database
        .subscribe();                             //This is to ensure that the flux is triggered.

Alternatively you can provide a consumer as a parameter of the subscribe() method.