I want to call independent request simultaneously with WebClient
. My previous approch with RestTemplate
was blocking my threads while waiting for the response. So I figured out, that WebClient
with ParallelFlux
could use one thread more efficient because it is supposed to schedule multiple requests with one thread.
My endpoint requests an tupel of id
and a location
.
The fooFlux
method will be called a few thousand times in a loop with different parameters. The returned map will be asserted against stored reference values.
Previous attemps ofthis code resulted in duplicated API calls.
But there is still a flaw. The size of the keyset of mapping
is often less than the size of Set<String> location
. In fact, the size of the resulting map is changing. Furthermore it is correct every now and then. So there might be an issue with the subscripton finishing after the method has returned the map.
public Map<String, ServiceDescription> fooFlux(String id, Set<String> locations) {
Map<String, ServiceDescription> mapping = new HashMap<>();
Flux.fromIterable(locations).parallel().runOn(Schedulers.boundedElastic()).flatMap(location -> {
Mono<ServiceDescription> sdMono = getServiceDescription(id, location);
Mono<Mono<ServiceDescription>> sdMonoMono = sdMono.flatMap(item -> {
mapping.put(location, item);
return Mono.just(sdMono);
});
return sdMonoMono;
}).then().block();
LOGGER.debug("Input Location size: {}", locations.size());
LOGGER.debug("Output Location in map: {}", mapping.keySet().size());
return mapping;
}
Handle Get-Request
private Mono<ServiceDescription> getServiceDescription(String id, String location) {
String uri = URL_BASE.concat(location).concat("/detail?q=").concat(id);
Mono<ServiceDescription> serviceDescription =
webClient.get().uri(uri).retrieve().onStatus(HttpStatus::isError, clientResponse -> {
LOGGER.error("Error while calling endpoint {} with status code {}", uri,
clientResponse.statusCode());
throw new RuntimeException("Error while calling Endpoint");
}).bodyToMono(ServiceDescription.class).retryBackoff(5, Duration.ofSeconds(15));
return serviceDescription;
}
JsonNode.class
and not serialize/deserialize into a concrete object? and why use reactive programming for something that can be solved using the@Async
. Reactive programming is not async-programming. They are two different things that complement each other. - ToerktumlareJsonNode.class
because the recieved JSON-Model is huge and I just need a tiny bit of it. I came up with reactive programming because of a baeldung article (baeldung.com/spring-webclient-resttemplate). I want to archive a gain in download speed. TheRestTemplate
approch within ParallelStreams got me between 10Mbit/s and 200 Mbit/s Download on my network-card. Depending on the amount of locations per id. But this varies from 1 to ~4000 - froehli