1
votes

I decided to find out how many msg/sec my notebook can prosess from multiple clients. I've changed Echo client/server from examples in next easy way: 1) on the client side I have infinite loop in the channelActive() method. Loop sends messages. 2) on the server side I have channelRead() method that handles incoming messages

When I run my client 2 times (in 2 separet threads), I expect to see how server processes the clients in 2 threads. Instead, the server processes only 1 client connnection, sometimes none at all. I looked at my client threads and find out they can't exit from the while loop in ChannelOutboundBuffer.addFlush() method. I can't understand what I've made wrong. I use netty 4.0.21

EchoClient.java

    public static void main(String[] args) throws Exception {
    // Configure the client.
    final EventLoopGroup group = new NioEventLoopGroup();

    Runnable r = new Runnable() {
        @Override
        public void run() {
            try {

                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new EchoClientHandler());
                            }
                        });

                // Start the client.
                ChannelFuture f = b.connect(HOST, PORT).sync();

                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // Shut down the event loop to terminate all threads.
                group.shutdownGracefully();
            }
        }
    };

    for (int i = 0; i < 2; i++) {
        Thread t = new Thread(r, i + " lalala");
        t.start();
    }
}

EchoClientHandler.java

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) {
    while (true) {
        ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt(number);
        ctx.writeAndFlush(time);
        ctx.flush();
    }
}


@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
    ctx.flush();
    ctx.fireChannelReadComplete();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    // Close the connection when an exception is raised.
    cause.printStackTrace();
    ctx.close();
    ctx.fireExceptionCaught(cause);
}

}

EchoServer.java

public final class EchoServer {

    public static void main(String[] args) throws Exception {

    // Configure the server.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new EchoServerHandler());
                    }
                });

        // Start the server.
        ChannelFuture f = b.bind(8007).sync();

        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        // Shut down all event loops to terminate all threads.
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

}

EchoServerHandler.java

@Sharable

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

long max_msg = 10000;
long cur_msg = 0;
long startTime = System.nanoTime();
final int NANOS_IN_SEC = 1000000000;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ReferenceCountUtil.release(msg);

    ++cur_msg;
    if (cur_msg == max_msg) {
        System.out.println("Throughput (msg/sec) : " + max_msg * NANOS_IN_SEC / (System.nanoTime() - startTime));
        cur_msg = 0;
        startTime = System.nanoTime();
    }
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
    ctx.flush();
    ctx.fireChannelReadComplete();
}

}

1

1 Answers

1
votes

You can not have an infinite loop in your channelActive(...) method as this will block the EventLoop which is used for multiple connections. This way you will basically block all processing of events for all Channels that use this EventLoop.