4
votes

I am starting to learn Webflux from Spring-boot. I learned that for an endpoint of a RestController you can define a Flux request body, where I expect a real flux stream, that is, the parts of the whole request come one after each other, and these parts can be processed also one after another. However after building a small example with a client and a server, I could not get this to work as expected.

So here is the snippet of the server:

@PostMapping("/digest")
    public Flux<String> digest(@RequestBody Flux<String> text) {
        continuousMD5.reset();
        return text.log("server.request.").map(piece -> continuousMD5.update(piece)).log("server.response.");
    }

Note: each piece of the text will be sent to a continuousMD5 object, which will accumulate all the pieces and calculate and return the intermediate MD5 hash value after each accumulation. The stream will be logged before and after the MD5 calculation.

And here is the snippet of the client:

@PostConstruct
    private void init() {
        webClient = webClientBuilder.baseUrl(reactiveServerUrl).build();
    }

@PostMapping(value = "/send", consumes = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> send(@RequestBody Flux<String> text) {
        return webClient.post()
            .uri("/digest")
            .accept(MediaType.TEXT_PLAIN)
            .body(text.log("client.request."), String.class)
            .retrieve().bodyToFlux(String.class).log("client.response.");
    }

Note: the client accepts a flux stream of some text and logs the stream and sends it to the server (as a flux stream).

Surprisingly I made it work to send a REST request and let the client receive a flux stream by the following command line:

for i in $(seq 1 100); do echo "The message $i"; done | http POST :8080/send  Content-Type:text/plain

and I could see that in the log of the client:

2019-05-09 17:02:08.604  INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2019-05-09 17:02:08.606  INFO 3462 --- [ctor-http-nio-2] client.response.Flux.MonoFlatMapMany.2   : request(1)
2019-05-09 17:02:08.649  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.650  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(32)
2019-05-09 17:02:08.674  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 1)
2019-05-09 17:02:08.676  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.676  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 2)
...
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onNext(The message 100)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.710  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-2] client.request.Flux.SwitchIfEmpty.1      : onComplete()
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.711  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.860  INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2   : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.862  INFO 3462 --- [ctor-http-nio-6] client.response.Flux.MonoFlatMapMany.2   : onComplete()
^C2019-05-09 17:02:47.393  INFO 3462 --- [ctor-http-nio-6] client.request.Flux.SwitchIfEmpty.1      : cancel()

that each piece of the text was recognized as an element of a flux stream and was requested separately.

But in the server log:

2019-05-09 17:02:08.811  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-05-09 17:02:08.813  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onSubscribe(FluxMap.MapSubscriber)
2019-05-09 17:02:08.814  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : request(1)
2019-05-09 17:02:08.814  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : request(1)
2019-05-09 17:02:08.838  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onNext(The message 1The message 2The message 3The message 4The message 5The message 6The message 7The message 8The message 9The message 10The message 11The message 12The message 13The message 14The message 15The message 16The message 17The message 18The message 19The message 20The message 21The message 22The message 23The message 24The message 25The message 26The message 27The message 28The message 29The message 30The message 31The message 32The message 33The message 34The message 35The message 36The message 37The message 38The message 39The message 40The message 41The message 42The message 43The message 44The message 45The message 46The message 47The message 48The message 49The message 50The message 51The message 52The message 53The message 54The message 55The message 56The message 57The message 58The message 59The message 60The message 61The message 62The message 63The message 64The message 65The message 66The message 67The message 68The message 69The message 70The message 71The message 72The message 73The message 74The message 75The message 76The message 77The message 78The message 79The message 80The message 81The message 82The message 83The message 84The message 85The message 86The message 87The message 88The message 89The message 90The message 91The message 92The message 93The message 94The message 95The message 96The message 97The message 98The message 99The message 100)
2019-05-09 17:02:08.840  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onNext(CSubeSX3yIVP2CD6FRlojg==)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : request(32)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : request(32)
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : onComplete()
2019-05-09 17:02:08.852  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : onComplete()
2019-05-09 17:02:47.394  INFO 3475 --- [ctor-http-nio-2] server.response.Flux.Map.2               : cancel()
2019-05-09 17:02:47.394  INFO 3475 --- [ctor-http-nio-2] server.request.Flux.SwitchIfEmpty.1      : cancel()

I saw that all the pieces of the text arrived at the server at once and hence were processed as one big element in the flux stream (one can also verify that there was only one MD5 hash calculated instead of 100).

What I would expect is that the server also receives the pieces of text from the client as elements in the flux stream, otherwise for the server it is not real reactive but just a normal blocking request.

Could anyone please help me understand how to make a real flux reactive request using Webflux? Thanks!

Update

I used a similar command line to make a REST request against the server and could see that the server received the pieces of the text ("The message x") as a flux stream. So I guess the server is ok, now the problem may be the client: how can I use the WebClient to make a real flux REST request?

2
The Reactive model is still mapping onto HTTP, and I'm having trouble understanding what you're meaning by "pieces": HTTP operates in discrete requests, which define the boundaries of your events.chrylis -cautiouslyoptimistic-
So when I sent the REST request with "The message x" from 1 to 100, the client received these messages one by one, or if I guess, the Webflux framework split the whole body into elements of a flux stream. I would expect the server does the same thing, because the client sends also a flux stream. But I saw that the server received all the elements of the flux stream as one big chunk and did not "split" the elements in the flux stream. I hope this explains better.edwardcarlfox

2 Answers

1
votes

If you want to achieve the streaming effect, you can:

0
votes

After trying things out and reading more documentations, I finally figured out how to make my example work:

For the client, I need to make sure that the request body sent to the server is also separated by a line feed:

@PostMapping(value = "/send", consumes = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> send(@RequestBody Flux<String> text) {
        return webClient.post()
            .uri("/digest")
            .accept(MediaType.TEXT_PLAIN)
            .body(
                text
                    .onBackpressureBuffer()
                    .log("client.request.")
                    .map(piece -> piece + "\n"),
                String.class)
            .retrieve().bodyToFlux(String.class)
            .onBackpressureBuffer()
            .log("client.response.");
    }

this achieves the same effect as making REST request via the command line, as for i in $(seq 1 100); do echo "The message $i"; done outputs "The message x" in lines.

Similarly, for the server, the response body needs also to be separated by line feed so that the client can decode the body to a flux:

@PostMapping("/digest")
    public Flux<String> digest(@RequestBody Flux<String> text) {
        continuousMD5.reset();
        return text
            .log("server.request.")
            .map(piece -> continuousMD5.update(piece))
            .map(piece -> piece + "\n")
            .log("server.response.");
    }

I added also the onBackpressureBuffer() to the client before sending and after receiving so that there is no overflow exception while sending a large number of messages.

However even though the above code "works", but it is not doing real streaming, as I can see in the logs, the server started to receive request body after the client sent the whole request body, and the client started to receive the response body after the server sent the whole response body. Perhaps as Ilya Zinkovich mentioned, using WebSocket protocol may achieve real streaming effect, but I did not try it out yet.