I am starting to learn Webflux from Spring-boot. I learned that for an endpoint of a RestController you can define a Flux request body, where I expect a real flux stream, that is, the parts of the whole request come one after each other, and these parts can be processed also one after another. However after building a small example with a client and a server, I could not get this to work as expected.
So here is the snippet of the server:
@PostMapping("/digest")
public Flux<String> digest(@RequestBody Flux<String> text) {
continuousMD5.reset();
return text.log("server.request.").map(piece -> continuousMD5.update(piece)).log("server.response.");
}
Note: each piece of the text will be sent to a continuousMD5 object, which will accumulate all the pieces and calculate and return the intermediate MD5 hash value after each accumulation. The stream will be logged before and after the MD5 calculation.
And here is the snippet of the client:
@PostConstruct
private void init() {
webClient = webClientBuilder.baseUrl(reactiveServerUrl).build();
}
@PostMapping(value = "/send", consumes = MediaType.TEXT_PLAIN_VALUE)
public Flux<String> send(@RequestBody Flux<String> text) {
return webClient.post()
.uri("/digest")
.accept(MediaType.TEXT_PLAIN)
.body(text.log("client.request."), String.class)
.retrieve().bodyToFlux(String.class).log("client.response.");
}
Note: the client accepts a flux stream of some text and logs the stream and sends it to the server (as a flux stream).
Surprisingly I made it work to send a REST request and let the client receive a flux stream by the following command line:
for i in $(seq 1 100); do echo "The message $i"; done | http POST :8080/send Content-Type:text/plain
and I could see that in the log of the client:
2019-05-09 17:02:08.604 INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2 : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2019-05-09 17:02:08.606 INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2 : request(1)
2019-05-09 17:02:08.649 INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1 : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.650 INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1 : request(32)
2019-05-09 17:02:08.674 INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1 : onNext(The message 1)
2019-05-09 17:02:08.676 INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1 : request(1)
2019-05-09 17:02:08.676 INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1 : onNext(The message 2)
...
2019-05-09 17:02:08.710 INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1 : onNext(The message 100)
2019-05-09 17:02:08.710 INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1 : request(1)
2019-05-09 17:02:08.710 INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1 : request(1)
2019-05-09 17:02:08.710 INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1 : request(1)
2019-05-09 17:02:08.711 INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1 : onComplete()
2019-05-09 17:02:08.711 INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1 : request(1)
2019-05-09 17:02:08.711 INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1 : request(1)
2019-05-09 17:02:08.860 INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2 : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.862 INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2 : onComplete()
^C2019-05-09 17:02:47.393 INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1 : cancel()
that each piece of the text was recognized as an element of a flux stream and was requested separately.
But in the server log:
2019-05-09 17:02:08.811 INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1 : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.813 INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2 : onSubscribe(FluxMap.MapSubscriber)
2019-05-09 17:02:08.814 INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2 : request(1)
2019-05-09 17:02:08.814 INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1 : request(1)
2019-05-09 17:02:08.838 INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1 : onNext(The message 1The message 2The message 3The message 4The message 5The message 6The message 7The message 8The message 9The message 10The message 11The message 12The message 13The message 14The message 15The message 16The message 17The message 18The message 19The message 20The message 21The message 22The message 23The message 24The message 25The message 26The message 27The message 28The message 29The message 30The message 31The message 32The message 33The message 34The message 35The message 36The message 37The message 38The message 39The message 40The message 41The message 42The message 43The message 44The message 45The message 46The message 47The message 48The message 49The message 50The message 51The message 52The message 53The message 54The message 55The message 56The message 57The message 58The message 59The message 60The message 61The message 62The message 63The message 64The message 65The message 66The message 67The message 68The message 69The message 70The message 71The message 72The message 73The message 74The message 75The message 76The message 77The message 78The message 79The message 80The message 81The message 82The message 83The message 84The message 85The message 86The message 87The message 88The message 89The message 90The message 91The message 92The message 93The message 94The message 95The message 96The message 97The message 98The message 99The message 100)
2019-05-09 17:02:08.840 INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2 : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.852 INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2 : request(32)
2019-05-09 17:02:08.852 INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1 : request(32)
2019-05-09 17:02:08.852 INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1 : onComplete()
2019-05-09 17:02:08.852 INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2 : onComplete()
2019-05-09 17:02:47.394 INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2 : cancel()
2019-05-09 17:02:47.394 INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1 : cancel()
I saw that all the pieces of the text arrived at the server at once and hence were processed as one big element in the flux stream (one can also verify that there was only one MD5 hash calculated instead of 100).
What I would expect is that the server also receives the pieces of text from the client as elements in the flux stream, otherwise for the server it is not real reactive but just a normal blocking request.
Could anyone please help me understand how to make a real flux reactive request using Webflux? Thanks!
Update
I used a similar command line to make a REST request against the server and could see that the server received the pieces of the text ("The message x") as a flux stream. So I guess the server is ok, now the problem may be the client: how can I use the WebClient to make a real flux REST request?