I decided to find out how many msg/sec my notebook can prosess from multiple clients. I've changed Echo client/server from examples in next easy way: 1) on the client side I have infinite loop in the channelActive() method. Loop sends messages. 2) on the server side I have channelRead() method that handles incoming messages
When I run my client 2 times (in 2 separet threads), I expect to see how server processes the clients in 2 threads. Instead, the server processes only 1 client connnection, sometimes none at all. I looked at my client threads and find out they can't exit from the while loop in ChannelOutboundBuffer.addFlush() method. I can't understand what I've made wrong. I use netty 4.0.21
EchoClient.java
public static void main(String[] args) throws Exception {
// Configure the client.
final EventLoopGroup group = new NioEventLoopGroup();
Runnable r = new Runnable() {
@Override
public void run() {
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
};
for (int i = 0; i < 2; i++) {
Thread t = new Thread(r, i + " lalala");
t.start();
}
}
EchoClientHandler.java
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
while (true) {
ByteBuf time = ctx.alloc().buffer(4);
time.writeInt(number);
ctx.writeAndFlush(time);
ctx.flush();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
ctx.fireChannelReadComplete();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
ctx.fireExceptionCaught(cause);
}
}
EchoServer.java
public final class EchoServer {
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(8007).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EchoServerHandler.java
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
long max_msg = 10000;
long cur_msg = 0;
long startTime = System.nanoTime();
final int NANOS_IN_SEC = 1000000000;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
++cur_msg;
if (cur_msg == max_msg) {
System.out.println("Throughput (msg/sec) : " + max_msg * NANOS_IN_SEC / (System.nanoTime() - startTime));
cur_msg = 0;
startTime = System.nanoTime();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
ctx.fireChannelReadComplete();
}
}