I am seeing some strange behaviour with the following client-server interaction and I am wondering whether I am running into HTTP/1.1 semantics or my Reactive programming skills need work (or both).
I am attempting to create a client-server interaction where both the request and response bodies are long-running streams of data.
The client is a Spring Reactive WebClient that sends an infinite stream in the request body. It expects to receive (and log) an infinite stream of results.
Flux<Long> requests = Flux.interval(Duration.ofSeconds(2));
return WebClient.create()
.post()
.uri("/instructions")
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(requests, Long.class)
.retrieve()
.bodyToFlux(Object.class)
.map(response -> {
log.info("Received Response Object {}", response);
return response;
});
The server is a spring-boot-starter-webflux application with a route handler to log the request objects as they are received and provide an infinite stream of results:
public Mono<ServerResponse> instructions(ServerRequest request) {
// Log the request objects as they are received
Flux<Object> requestStream = request.bodyToFlux(Object.class)
.map(r -> {
log.info("Received Request Object: {}", r);
return r;
});
requestStream.subscribe();
// Infinite stream of responses
Flux<Long> responses = Flux.interval(Duration.ofSeconds(5));
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(responses, Long.class);
}
When the above code is run, the server logs the infinite stream of request objects but the client never logs any response objects.
If I bound the request stream by doing something like so: Flux<Long> requests = Flux.interval(Duration.ofSeconds(2)).take(20); then the client begins to log the responses after all requests are received.
What are the problems here? * Is there something wrong with the reactive code? * Is this part of the HTTP/1.1 specification where a response header should not be sent until the request body is completely received?