2
votes

I'm trying to confirm the value of an HTTP response header with Spring 5 WebClient, but only if the web call responds with an HTTP 200 status code. In this use case if authentication is not successful, the API call returns with an HTTP 401 without the response header present. I have the following code below which functionally works, but it is making the web call twice (because I'm blocking twice). Short of just blocking on the HTTP response header only, and putting a try/catch for an NPE when the header isn't present, is there any "cleaner" way to do this?

import java.net.URI;
import java.time.Duration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFunctions;


import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@SpringBootApplication
public class ContentCheckerApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(ContentCheckerApplication.class);

private ExchangeFunction exchange = ExchangeFunctions.create(new ReactorClientHttpConnector());         

public static void main(String[] args) {
    SpringApplication app = new SpringApplication(ContentCheckerApplication.class);
    // prevent SpringBoot from starting a web server
    app.setWebApplicationType(WebApplicationType.NONE);
    app.run(args);
}

@Bean
public CommandLineRunner myCommandLineRunner() {

    return args -> {
              // Our reactive code will be declared here
        LinkedMultiValueMap<String, String> formData = new LinkedMultiValueMap<String, String>();

        formData.add("username", args[2]);
        formData.add("password", args[3]);

        ClientRequest request = ClientRequest.method(HttpMethod.POST, new URI(args[0]+"/api/token"))
                .body(BodyInserters.fromFormData(formData)).build();

        Mono<ClientResponse> mresponse = exchange.exchange(request);
        Mono<String> mnewToken = mresponse.map(response -> response.headers().asHttpHeaders().getFirst("WSToken"));
        LOGGER.info("Blocking for status code...");
        HttpStatus statusCode = mresponse.block(Duration.ofMillis(1500)).statusCode();
        LOGGER.info("Got status code!");

        if (statusCode.value() == 200) {

            String newToken = mnewToken.block(Duration.ofMillis(1500));
            LOGGER.info("Auth token is: " + newToken);

        } else {
            LOGGER.info("Unable to authenticate successfully! Status code: "+statusCode.value());
        }
       };
    }
}
1
You aren't using the WebClient you are simulating a WebClient yourself. Just use the WebClient, chain the proper calls and don't block. - M. Deinum
But why are you mapping if you only want to do that if the status is OK... Map after that. Don't do operations that are potentially useless. - M. Deinum
Mono<String> tokenResult = webClient.post() .uri( args[0] + "/api/token" ) .body( BodyInserters.fromFormData(formData)) .exchange() .map(response -> { if (HttpStatus.OK.equals(response.statusCode())) { . return response.headers().asHttpHeaders().getFirst("WSToken"); } else { return ""; } }); - BJ Weschke
That works, but I'm still needing to block I think in order to enforce a 1500 ms timeout. Is there something that allows me to set a timeout on the webclient so that I can subscribe to the Mono<String> instead of blocking? - BJ Weschke
Whh do you need to force a timeout? - M. Deinum

1 Answers

2
votes

Thanks to comments from @M. Deinum to guide me, I have the following code which is workable now.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeFunctions;
import org.springframework.web.reactive.function.client.WebClient;

import reactor.core.publisher.Mono;

@SpringBootApplication
public class ContentCheckerApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(ContentCheckerApplication.class);

private ExchangeFunction exchange = ExchangeFunctions.create(new ReactorClientHttpConnector());         

public static void main(String[] args) {
    SpringApplication app = new SpringApplication(ContentCheckerApplication.class);
    // prevent SpringBoot from starting a web server
    app.setWebApplicationType(WebApplicationType.NONE);
    app.run(args);
}

@Bean
public CommandLineRunner myCommandLineRunner() {

    return args -> {
              // Change some Netty defaults
        ReactorClientHttpConnector connector = new ReactorClientHttpConnector(
                  options -> options.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
                                    .compression(true)
                                    .afterNettyContextInit(ctx -> {
                                        ctx.addHandlerLast(new ReadTimeoutHandler(1500, TimeUnit.MILLISECONDS));
                                    }));


        LinkedMultiValueMap<String, String> formData = new LinkedMultiValueMap<String, String>();

        formData.add("username", args[2]);
        formData.add("password", args[3]);

        WebClient webClient = WebClient.builder().clientConnector(connector).build();

            Mono<String> tokenResult = webClient.post()
                    .uri( args[0] + "/api/token" )
                    .body( BodyInserters.fromFormData(formData))
                    .exchange()
                    .onErrorMap(ContentCheckerApplication::handleAuthTokenError)
                    .map(response -> {

                            if (HttpStatus.OK.equals(response.statusCode())) {
                                return response.headers().asHttpHeaders().getFirst("WSToken");
                            } else {
                                return "";
                            }

                    });

            LOGGER.info("Subscribing for the result and then going to sleep");
            tokenResult.subscribe(ContentCheckerApplication::handleAuthTokenResponse);

        Thread.sleep(3600000);
       };
    }

private static Throwable handleAuthTokenError(Throwable e) {
    LOGGER.error("Exception caught trying to process authentication token. ",e);
    ContentCheckerApplication.handleAuthTokenResponse("");      
    return null;        
}

private static void handleAuthTokenResponse(String newToken) {

    LOGGER.info("Got status code!");

    if (!newToken.isEmpty()) {

        LOGGER.info("Auth token is: " + newToken);

    } else {
        LOGGER.info("Unable to authenticate successfully!");
    }

    System.exit(0);
}
}