3
votes

I am writing an application where both client and server are written using Netty and the server should (obviously) support many clients at once. I've tried to test it by creating 1000 clients sharing one EventLoopGroup and running everything on single machine.

Initially I had multiple clients sometimes failing to connect due to timeout. Increasing SO_TIMEOUT_MILLIS on client and setting SO_BACKLOG to numberOfClients on server fixed this problem. However, I still get connection reset by peer or

io.netty.channel.AbstractChannel$AnnotatedConnectException: syscall:getsockopt(..) failed: Connection refused: localhost/127.0.0.1:8080
    at io.netty.channel.unix.Socket.finishConnect(..)(Unknown Source)
Caused by: io.netty.channel.unix.Errors$NativeConnectException: syscall:getsockopt(..) failed: Connection refused
    ... 1 more

on the client side sometimes (especially when I increase number of clients). Output of the server-side LoggingHandler doesn't seem to show any attempt to connect from the port those channels bind to on client side. Trying to use Nio* instead of Epoll* types didn't help either.

Are there other options which need to be set to allow more connections (probably on server side, if it's really the one refusing/resetting connections)?

To simplify the situation, I removed my own logic, so the clients just connect over websocket and close the channel after handshake succeeds. To my understanding, Netty shouldn't normally have problems handling 10000 simultaneous websocket connections which don't do much.

The ulimit -n is 1000000, ulimit -u is 772794, so neither should be a problem.

Here is the code (in Kotlin, but Java translation should be clear):

package netty

import io.netty.bootstrap.Bootstrap
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.*
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
import io.netty.handler.codec.http.websocketx.WebSocketVersion
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import org.junit.Test
import java.net.URI

@Suppress("OverridingDeprecatedMember")
class NettyTest {
    private fun channelInitializer(f: (Channel) -> Unit) = object : ChannelInitializer<Channel>() {
        override fun initChannel(ch: Channel) {
            f(ch)
        }
    }

    private val numberOfClients = 10000
    private val maxHttpContentLength = 65536

    @Test
    fun manyClients() {
        // set up server
        val bossLoopGroup = EpollEventLoopGroup(1)
        val workerLoopGroup = EpollEventLoopGroup()
        val serverChannelFactory = ChannelFactory { EpollServerSocketChannel() }
        val clientLoopGroup = EpollEventLoopGroup()
        val clientChannelFactory = ChannelFactory { EpollSocketChannel() }
        val serverChannel = ServerBootstrap().channelFactory(serverChannelFactory).group(bossLoopGroup, workerLoopGroup).handler(LoggingHandler(LogLevel.DEBUG)).childHandler(channelInitializer {
            it.pipeline().addLast(
                    HttpServerCodec(),
                    HttpObjectAggregator(maxHttpContentLength),
                    WebSocketServerCompressionHandler(),
                    WebSocketServerProtocolHandler("/", null, true, maxHttpContentLength)/*,
                    myServerHandler*/
            )
        }).option(ChannelOption.SO_BACKLOG, numberOfClients).bind("localhost", 8080).sync().channel()
        println("Server started")

        try {
            // set up clients    
            val url = URI("ws://localhost")
            val futures = List(numberOfClients) { clientIndex ->
                val handshaker = WebSocketClientHandshakerFactory.newHandshaker(url, WebSocketVersion.V13, null, true, null)
                val promise = clientLoopGroup.next().newPromise<Channel>()

                val connectFuture = Bootstrap().channelFactory(clientChannelFactory).group(clientLoopGroup).handler(channelInitializer {
                    it.pipeline().addLast(
                            HttpClientCodec(),
                            HttpObjectAggregator(maxHttpContentLength),
                            WebSocketClientCompressionHandler.INSTANCE,
                            WebSocketClientProtocolHandler(handshaker, true),
                            object : ChannelInboundHandlerAdapter() {
                                override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
                                    if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
                                        promise.setSuccess(ctx.channel())
                                        println("Client $clientIndex handshake successful")
                                    }
                                }

                                override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
                                    promise.setFailure(cause)
                                }
                            })
                }).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 120000).connect("localhost", 8080)
                Pair(promise, connectFuture)
            }
            for ((promise, connectFuture) in futures) {
                connectFuture.sync()
                try {
                    promise.sync()
                } finally { connectFuture.channel().close().sync() }
            }
        } finally {
            try { serverChannel.close().sync() } finally {
                workerLoopGroup.shutdownGracefully()
                bossLoopGroup.shutdownGracefully()
                clientLoopGroup.shutdownGracefully()
            }
        }
    }
}
1
Very interesting. To my undestanding all web-socket handshakes are currently being performed by IO threads. Have you tried to put all the handshakes out of IO threads and add them to another EventLoopGroup? - St.Antario
Actually performing handshakes has to be done in IO thread, doesn't it? Creating the request and response could be pulled out, this would mean not being able to use WebSocket*ProtocolHandler (though most code from them could be reused). - Alexey Romanov
Agree, didn't pay enough attention to the problem. Btw, have you found a solution? What was the problem actually about? - St.Antario
Not yet, unfortunately. - Alexey Romanov
Hi Alexey, did you have any solution for this problem? - Oleg Ushakov

1 Answers

0
votes

There is only 1 thread for accepting incoming connections: bossLoopGroup = EpollEventLoopGroup(1). Maybe this is not enough for accepting the client connection swarm.

I suggest to share one single EventLoopGroup as boss, worker, and client, with default number of threads (Netty kindly takes the number of cores in account). So you won't have underused/overused thread pools.

If you want to run your test with different thread pools, create them with explicit sizes, with more than 1 thread for your bossLoopGroup.