This is a second attempt, with revised demo code that, hopefully, better illustrates the issue. The code has been stripped down to remove all elements except those demonstrating the issue being encountered.
Add Note: Some additional testing was done, and the results posted as an answer (vice extending this post). It may be that this is "expected behavior", but I'm still trying to understand "the why".
The code "works", in that it returns the expected information (either a String, or a list of Strings). However, when a WebClient is used to access a REST endpoint (localhost:8080/test/DemoClient) that returns a Flux, two calls are being made into the associated handler (DemoMainHandler.getAll()). I do not see where the second call is being made on DemoMainHandler.getAll(), but am concerned about potential performance issues if this occurs in a production environment.
In the code provided, everything is running under a single Spring Webflux application, so there is not a separate process for the DemoClient code.
Accessing the REST endpoint at localhost:8080/test/DemoClient/2 appears to work correctly, returning a Mono to Postman with the value "Only One". More importantly, DemoMainHandler.getById() is only invoked once.
However, accessing the REST endpoint at localhost:8080/test/DemoClient produces results that are a bit concerning. The String values returned to Postman via a Flux appear okay, but
- DemoClientHandler.getAll() invoked when REST endpoint is accessed
- *DemoClientHandler.getAll() invokes DemoClient.getAll()
- DemoClient.getAll() uses a WebClient to access the REST endpoint at localhost:8080/test/DemoMain
- DemoClient.getAll() uses flatMapMany to iterate over the returned ClientResponse and extract the Flux from the response body
- The Flux generated by DemoCLient.getAll() is returned to DemoClientHandler.getAll()
- DemoClientHandler.getAll() inspects the Flux, determines it has one-or-more elements, and returns the Flux in a ServerResponse to the initial client (in this case, Postman)
- Postman then unpacks the Flux (map? flatMap?) and displays the returned Strings ("CallMeOnce")
What I do not understand is why DemoClientHandler.getAll() is being invoked a second time, as indicated by the second System.out.println() output to the console. It appears to be associated with the use of Flux as a return type?
Add note: On the chance that the issue was somehow driven by the .exchange().flatMapMany() construct, I tried using .retrieve().bodyToFlux() construct (see commented code in DemoClient). Same result (i.e., DemoMainHandler.getAll() appears to be invoked twice).
Console Output
2019-10-07 08:16:18.953 INFO 9384 --- [ main] c.example.testdupe.TestDupeApplication : Starting TestDupeApplication on M7730-LFR with PID 9384 (D:\sandbox\TestDupe\build\classes\java\main started by LesR in D:\sandbox\TestDupe)
2019-10-07 08:16:18.953 INFO 9384 --- [ main] c.example.testdupe.TestDupeApplication : No active profile set, falling back to default profiles: default
2019-10-07 08:16:20.062 INFO 9384 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2019-10-07 08:16:20.062 INFO 9384 --- [ main] c.example.testdupe.TestDupeApplication : Started TestDupeApplication in 1.324 seconds (JVM running for 1.871)
***** Invoke localhost:8080/test/DemoClient/{id}
DemoClientHandler.getById( ServerRequest )
DemoClient.getById( 2 )
DemoMainHandler.getById( ServerRequest )
***** Invoke localhost:8080/test/DemoClient
DemoClientHandler.getAll( ServerRequest )
DemoClientHandler.getAll() >> BEFORE invoking demoClient.getAll()
DemoClient.getAll()
DemoClient.getAll() >> RETURN fluxString
DemoClientHandler.getAll() >> AFTER invoking demoClient.getAll()
DemoMainHandler.getAll( ServerRequest )
DemoMainHandler.getAll( ServerRequest )
Example Code
@SpringBootApplication
public class TestDupeApplication {
public static void main(String[] args) {
SpringApplication.run(TestDupeApplication.class, args);
}
}
@Configuration
public class DemoClientRouter {
@Bean
public RouterFunction<ServerResponse> clientRoutes(DemoClientHandler requestHandler) {
return nest(path("/test"),
nest(accept(APPLICATION_JSON),
RouterFunctions.route(RequestPredicates.GET("/DemoClient"), requestHandler::getAll)
.andRoute(RequestPredicates.GET("/DemoClient/{id}"), requestHandler::getById)));
}
}
@Component
public class DemoClientHandler {
@Autowired
DemoClient demoClient;
public Mono<ServerResponse> getAll(ServerRequest request) {
System.out.println("DemoClientHandler.getAll( ServerRequest )");
System.out.println("DemoClientHandler.getAll() >> BEFORE invoking demoClient.getAll()");
Flux<String> fluxString = demoClient.getAll();
System.out.println("DemoClientHandler.getAll() >> AFTER invoking demoClient.getAll()");
return fluxString.hasElements().flatMap(hasElement -> {
return hasElement ? ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(fluxString, String.class)
: ServerResponse.noContent().build();
});
}
public Mono<ServerResponse> getById(ServerRequest request) {
System.out.println("DemoClientHandler.getById( ServerRequest )");
Mono<String> monoString;
return demoClient.getById( 2 ).flatMap(stringVal -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(stringVal), String.class))
.switchIfEmpty(ServerResponse.notFound().build());
}
}
@Component
public class DemoClient {
private final WebClient client;
public DemoClient() {
client = WebClient.create();
}
public Flux<String> getAll() {
System.out.println("DemoClient.getAll()");
Flux<String> fluxString;
fluxString = client.get().uri("http://localhost:8080/test/DemoMain")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(response -> response.bodyToFlux(String.class));
// fluxString = client.get().uri("http://localhost:8080/test/DemoMain")
// .accept(MediaType.APPLICATION_JSON)
// .retrieve()
// .bodyToFlux(String.class);
System.out.println("DemoClient.getAll() >> RETURN fluxString");
return fluxString;
}
public Mono<String> getById(int id) {
System.out.printf("DemoClient.getById( %d )%n", id);
return client.get().uri("http://localhost:8080/test/DemoMain/" + id)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(response -> response.bodyToMono(String.class));
}
}
@Configuration
public class DemoMainRouter {
@Bean
public RouterFunction<ServerResponse> demoPOJORoute(DemoMainHandler requestHandler) {
return nest(path("/test"),
nest(accept(APPLICATION_JSON),
RouterFunctions.route(RequestPredicates.GET("/DemoMain"), requestHandler::getAll)
.andRoute(RequestPredicates.GET("/DemoMain/{id}"), requestHandler::getById)));
}
}
@Component
public class DemoMainHandler {
public Mono<ServerResponse> getAll(ServerRequest request) {
System.out.println("DemoMainHandler.getAll( ServerRequest )");
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(Flux.just("Call", "Me", "Once"), String.class);
}
public Mono<ServerResponse> getById(ServerRequest request) {
System.out.println("DemoMainHandler.getById( ServerRequest )");
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just("Only One"), String.class);
}
}
This code added to support follow-on discussion...
@Component
public class DemoClient {
private final WebClient client;
public DemoClient() {
client = WebClient.create();
}
public Flux<String> getAll() {
Flux<String> fluxString;
Mono<ClientResponse> monoCR = client.get().uri("http://localhost:8080/test/DemoMain")
.accept(MediaType.APPLICATION_JSON)
.exchange();
fluxString = monoCR.flatMapMany(clientResponse -> clientResponse.bodyToFlux(String.class));
// fluxString.subscribe();
// return fluxString;
return Flux.just("Foo", "Bar");
}