
I'm trying to use Reactor Netty TcpClient in reactive way to interact with hosts, that may be unreachable. Here is an example of a channel initialization logic:

ConnectionProvider connectionProvider = ConnectionProvider.fixed("fixed", 50);
  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
  .doOnConnect(x -> log.trace("Connect to {}:{}", host, port))
  .doOnConnected(conn -> log.trace("Connected {}", conn.channel()))

the output, that i receiving :

2019-09-04 08:23:13.612 TRACE 71988 --- [ioEventLoop-4-3] c.c.pcb.poc.network.tcp.NettyTcpSender   : Connect to
2019-09-04 08:23:13.684  WARN 71988 --- [actor-tcp-nio-4] io.netty.util.concurrent.DefaultPromise  : An exception was thrown by reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.operationComplete()

reactor.core.Exceptions$ErrorCallbackNotImplemented: io.netty.channel.ConnectTimeoutException: connection timed out: /
Caused by: io.netty.channel.ConnectTimeoutException: connection timed out: /
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoCreate] :
Error has been observed by the following operator(s):
    |_  Mono.create ⇢ reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:130)
    |_  Mono.doOnSubscribe ⇢ reactor.netty.tcp.TcpClientDoOn.connect(TcpClientDoOn.java:58)

The 'inbound' and 'outbound' are having a dedicated method to handle their errors, but they works on top of a Connection instance that won't be created if you got the 'connection timeout'.

I tried:

  1. The exception, that i receiving is wrapped in 'ErrorCallbackNotImplemented'. But I wasn't able to find any way to implement any 'ErrorCallback'

  2. The log contains a warning message from 'io.netty.util.concurrent.DefaultPromise' . but I wasn't able to find a way how to make own Promise to handle it in a right way.

  3. No any configurations i've found that may somehow intercept connection timeouts.

  4. workaround. The blocked approach to create a connection ( .block() instead of .subscribe()) will allow me to catch any Connection creating exceptions within plain try-catch block, but i'll lose the benefits of reactive approach with such workaround.

Do somebody may suggest me at least something to help me to find a right way to handle a 'io.netty.channel.ConnectTimeoutException'?


1 Answers


Do not forget to implement your error callback

Usually reactor.core.Exceptions$ErrorCallbackNotImplemented happens when there is subscription over labmda based .subscribe method (same for Mono and Flux).

If you are going to look at the sources here and here, you will find the place where reactor.core.Exceptions$ErrorCallbackNotImplemented is thrown!

Action Points

In order to handle the original io.netty.channel.ConnectTimeoutException I would recommend looking at Handling Errors section of the original Project Reactor documentation