0
votes

New to reactive programming and trying to create a reactive service via WebFlux and WebClient.

The flow of the method is like

  1. POST request and wait for response back
  2. Body of response to a mapping service (which has some other business logic) and which returns a Recommendations type
  3. Create a ResponseEntity
  4. Create a Mono of type Mono<ResponseEntity>

Question is this a valid of way doing this as in should I be using .exchange()? and is there a way of chaining these methods instead of individual methods

current implementation:

private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {

    Mono<String> response = webClient.build()
            .post()
            .uri(uri)
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve().bodyToMono(String.class);

    var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
    var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);
    return Mono.just(entity);

}
2

2 Answers

2
votes

The short answer is probably no.

The longer version, is that when someone starts subscribing to your service (which is a producer) the client wants to consume data. As soon as the subscription starts what webflux will do is the build the reactive chain inside the application. This chain can be compared with a sort of callback chain and is called the "assembly phase".

During this assembly phase it is important that each Flux/Mono's returns are chained onto each other. Otherwise you are breaking the chain.

var firstAction = Mono.just("Hello").flatMap(value -> {
    // do something
});

// Next action needs to chain on the last
var secondAction = firstAction.flatMap(value -> {
    // Do the next thing
});

// Can be combined
var bothActions = Mono.just("Hello").flatMap(value -> {
    // do something
}).flatMap(value -> {
    // do next thing
});

In the above example you can see that i am constantly chaining on the last action, we are not breaking the chain.

Now to your code.

private Mono<ResponseEntity<Recommendations>> myMethod(final Request request, final String variantName) {

    // Here you have a response
    Mono<String> response = webClient.build()
            .post()
            .uri(uri)
            // Not needed content type will default to json
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            // will also default to json
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve().bodyToMono(String.class);

    // here you pass the response into a function, hence probably breaking the chain
    var recommendations = ((XYZResponseMapper) responseMapper).mapReactive(request, response, useCaseId, variantName); //return type Recommendations
    var entity = new ResponseEntity<>(recommendations, nullHeaders, HttpStatus.OK);

    // Here you are suddenly creating a new mono, which tells me you deffo broke the chain and need to recreate it by doing a mono#just
    return Mono.just(entity);

}

So how do we solve it?

private Mono<Recommendations> myMethod(final Request request, final String variantName) {
    // You should not build a webclient on each request, you should build it in a @Bean
    final Mono<XYZResponse> response = webClient.build()
            .post()
            .uri(uri)
            .bodyValue(requestBody)
            .retrieve()
            // Map into a class representation to uphold type safety
            .bodyToMono(XYZResponse.class);

    // if we need to do additional transformations we can flatMap and chain on
    final Mono<Recommendations> recommendations = response.flatMap(value -> {
        var recommendations = mapper.toRecommendations(value);
    });

    // No need to wrap it in a response webflux will do that for you automatically when you return it to the client
    return recommendations;
}

we can then even rewrite it even shorter.

private Mono<Recommendations> myMethod(final Request request, final String variantName) {
    return webClient.build()
            .post()
            .uri(uri)
            .bodyValue(requestBody)
            .retrieve()
            .bodyToMono(XYZResponse.class)
            .flatMap(value -> {
                var recommendations = mapper.toRecommendations(value);
            });
}

I usually write the return statement on the first row then write my chain after that.

if you want to optimise it even more (and is generally recommended) you create a webclient in a configuration bean so the webclient only gets created once (at the start up of the webflux server) and then we reuse it in each request.

@Configuration
public class ClientConfig {

    @Bean
    public WebClient webclient(WebClient.Builder webClient) {
        return webClient.baseUrl( ... )
                .build();
    }
}

@Component
public class RecommendationHandler {

    final private WebClient 

    @Autowire
    public RecommendationHandler(WebClient webClient) {
        this.webClient = webClient;
    }

    private Mono<Recommendations> getRecommendations(RequestBody requestBody) {
    return webClient
            .post()
            .bodyValue(requestBody)
            .retrieve()
            .bodyToMono(XYZResponse.class)
            .flatMap(value -> {
                var recommendations = mapper.toRecommendations(value);
            });
    }   
}

Something like this. This is just an example, i have not run it in an IDE, i just wrote this from the top of my head. But thats some of the things i though of, there is no error handling in this, which should be address.

Good luck

-1
votes

After a lot of reading and experimenting I managed to make it work with the following:

    return webClient.build()
            .post()
            .uri(uri)
            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
            .bodyValue(requestBody)
            .retrieve()
            .toEntity(String.class)
            .publishOn(Schedulers.boundedElastic())
            .map(x -> {
           
                var recs = processResponse(request, x.getBody(), useCaseId, variantName);
                return new ResponseEntity<GatewayRecommendations>(recs, x.getStatusCode());
            });