I am writing an application where both client and server are written using Netty and the server should (obviously) support many clients at once. I've tried to test it by creating 1000 clients sharing one EventLoopGroup and running everything on single machine.
Initially I had multiple clients sometimes failing to connect due to timeout. Increasing SO_TIMEOUT_MILLIS on client and setting SO_BACKLOG to numberOfClients on server fixed this problem. However, I still get connection reset by peer or
io.netty.channel.AbstractChannel$AnnotatedConnectException: syscall:getsockopt(..) failed: Connection refused: localhost/127.0.0.1:8080
at io.netty.channel.unix.Socket.finishConnect(..)(Unknown Source)
Caused by: io.netty.channel.unix.Errors$NativeConnectException: syscall:getsockopt(..) failed: Connection refused
... 1 more
on the client side sometimes (especially when I increase number of clients). Output of the server-side LoggingHandler doesn't seem to show any attempt to connect from the port those channels bind to on client side. Trying to use Nio* instead of Epoll* types didn't help either.
Are there other options which need to be set to allow more connections (probably on server side, if it's really the one refusing/resetting connections)?
To simplify the situation, I removed my own logic, so the clients just connect over websocket and close the channel after handshake succeeds. To my understanding, Netty shouldn't normally have problems handling 10000 simultaneous websocket connections which don't do much.
The ulimit -n is 1000000, ulimit -u is 772794, so neither should be a problem.
Here is the code (in Kotlin, but Java translation should be clear):
package netty
import io.netty.bootstrap.Bootstrap
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.*
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
import io.netty.handler.codec.http.websocketx.WebSocketVersion
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import org.junit.Test
import java.net.URI
@Suppress("OverridingDeprecatedMember")
class NettyTest {
private fun channelInitializer(f: (Channel) -> Unit) = object : ChannelInitializer<Channel>() {
override fun initChannel(ch: Channel) {
f(ch)
}
}
private val numberOfClients = 10000
private val maxHttpContentLength = 65536
@Test
fun manyClients() {
// set up server
val bossLoopGroup = EpollEventLoopGroup(1)
val workerLoopGroup = EpollEventLoopGroup()
val serverChannelFactory = ChannelFactory { EpollServerSocketChannel() }
val clientLoopGroup = EpollEventLoopGroup()
val clientChannelFactory = ChannelFactory { EpollSocketChannel() }
val serverChannel = ServerBootstrap().channelFactory(serverChannelFactory).group(bossLoopGroup, workerLoopGroup).handler(LoggingHandler(LogLevel.DEBUG)).childHandler(channelInitializer {
it.pipeline().addLast(
HttpServerCodec(),
HttpObjectAggregator(maxHttpContentLength),
WebSocketServerCompressionHandler(),
WebSocketServerProtocolHandler("/", null, true, maxHttpContentLength)/*,
myServerHandler*/
)
}).option(ChannelOption.SO_BACKLOG, numberOfClients).bind("localhost", 8080).sync().channel()
println("Server started")
try {
// set up clients
val url = URI("ws://localhost")
val futures = List(numberOfClients) { clientIndex ->
val handshaker = WebSocketClientHandshakerFactory.newHandshaker(url, WebSocketVersion.V13, null, true, null)
val promise = clientLoopGroup.next().newPromise<Channel>()
val connectFuture = Bootstrap().channelFactory(clientChannelFactory).group(clientLoopGroup).handler(channelInitializer {
it.pipeline().addLast(
HttpClientCodec(),
HttpObjectAggregator(maxHttpContentLength),
WebSocketClientCompressionHandler.INSTANCE,
WebSocketClientProtocolHandler(handshaker, true),
object : ChannelInboundHandlerAdapter() {
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
promise.setSuccess(ctx.channel())
println("Client $clientIndex handshake successful")
}
}
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
promise.setFailure(cause)
}
})
}).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 120000).connect("localhost", 8080)
Pair(promise, connectFuture)
}
for ((promise, connectFuture) in futures) {
connectFuture.sync()
try {
promise.sync()
} finally { connectFuture.channel().close().sync() }
}
} finally {
try { serverChannel.close().sync() } finally {
workerLoopGroup.shutdownGracefully()
bossLoopGroup.shutdownGracefully()
clientLoopGroup.shutdownGracefully()
}
}
}
}
EventLoopGroup? - St.AntarioWebSocket*ProtocolHandler(though most code from them could be reused). - Alexey Romanov