I try to understand how does reactive programming really work. I prepared simple demo for this purpose: reactive WebClient from Spring Framework sends requests to simple rest api and this client prints name of thread in each operation.
rest api:
@RestController
@SpringBootApplication
public class RestApiApplication {
public static void main(String[] args) {
SpringApplication.run(RestApiApplication.class, args);
}
@PostMapping("/resource")
public void consumeResource(@RequestBody Resource resource) {
System.out.println(String.format("consumed resource: %s", resource.toString()));
}
}
@Data
@AllArgsConstructor
class Resource {
private final Long id;
private final String name;
}
and the most important - reactive web client:
@SpringBootApplication
public class ReactorWebclientApplication {
public static void main(String[] args) {
SpringApplication.run(ReactorWebclientApplication.class, args);
}
private final TcpClient tcpClient = TcpClient.create();
private final WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
.baseUrl("http://localhost:8080")
.build();
@PostConstruct
void doRequests() {
var longs = LongStream.range(1L, 10_000L)
.boxed()
.toArray(Long[]::new);
var longsStream = Stream.of(longs);
Flux.fromStream(longsStream)
.map(l -> {
System.out.println(String.format("------- map [%s] --------", Thread.currentThread().getName()));
return new Resource(l, String.format("name %s", l));
})
.filter(res -> {
System.out.println(String.format("------- filter [%s] --------", Thread.currentThread().getName()));
return !res.getId().equals(11_000L);
})
.flatMap(res -> {
System.out.println(String.format("------- flatmap [%s] --------", Thread.currentThread().getName()));
return webClient.post()
.uri("/resource")
.syncBody(res)
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.retrieve()
.bodyToMono(Resource.class)
.doOnSuccess(ignore -> System.out.println(String.format("------- onsuccess [%s] --------", Thread.currentThread().getName())))
.doOnError(ignore -> System.out.println(String.format("------- onerror [%s] --------", Thread.currentThread().getName())));
})
.blockLast();
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
class Resource {
private final Long id;
private final String name;
@JsonCreator
Resource(@JsonProperty("id") Long id, @JsonProperty("name") String name) {
this.id = id;
this.name = name;
}
Long getId() {
return id;
}
String getName() {
return name;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Resource{");
sb.append("id=").append(id);
sb.append(", name='").append(name).append('\'');
sb.append('}');
return sb.toString();
}
}
And the problem is the behaviour is different than I predicted.
I expected that each call of .map(), .filter() and .flatMap() will be executed on main thread and each call of .doOnSuccess() or .doOnError will be executed on a thread from nio thread pool. So I expected logs that look like:
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
(and so on...)
------- onsuccess [reactor-http-nio-2] --------
(and so on...)
But the logs I've got are:
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- onsuccess [reactor-http-nio-2] --------
------- onsuccess [reactor-http-nio-6] --------
------- onsuccess [reactor-http-nio-4] --------
------- onsuccess [reactor-http-nio-8] --------
------- map [reactor-http-nio-2] --------
------- filter [reactor-http-nio-2] --------
------- flatmap [reactor-http-nio-2] --------
------- map [reactor-http-nio-2] --------
and each next log in .map(), .filter() and .flatMap() was done on thread from reactor-http-nio.
Next incomprehensible fact is the ratio between operations executed on main thread and reactor-http-nio is always different. Sometimes all operations .map(), .filter() and .flatMap() are performed on main thread.