Below is my TCPServer Code
public class TCPServer {
private final int port;
public TCPServer(final int port) {
this.port = port;
}
public void run() throws Exception {
final ExecutorService threadPool = Executors.newCachedThreadPool();
final EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
final EventLoopGroup workerGroup = new NioEventLoopGroup(50, threadPool);
try {
final ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(final SocketChannel ch) throws Exception {
//ch.pipeline().addLast(new ServerRequestHandler());
ch.pipeline().addLast(new ReqMessageDecoder(1024, 0, 2, 0, 2), new ServerRequestHandler());
}
}).option(ChannelOption.SO_BACKLOG, 1000) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
final ChannelFuture f = b.bind(this.port).sync(); // (7) // Start
// the server
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to
// gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
threadPool.shutdown();
}
}
public static void main(final String[] args) throws Exception {
int port = 9090;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new TCPServer(port).run();
}
}
MyRequestHandler Class
public class ServerRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) { // (2)
log(" ServerRequestHandler reading message :: " + Thread.currentThread().getName());
ctx.write(msg);
}
}
My Decoder Class
public class ReqMessageDecoder extends LengthFieldBasedFrameDecoder {
public ReqMessageDecoder(final int maxFrameLength, final int lengthFieldOffset, final int lengthFieldLength,
final int lengthAdjustment, final int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
I have below sample TCP client code which sends a sample message to the above server in a loop for 79 times in sequential manner.
public static void main(final String args[]) {
int i = 80;
final String message = "This is a sample message but we can send actual";
final TestClass testClass = new TestClass();
testClass.startConnection("localhost", 9090);
while (i > 1) {
i--;
testClass.sendMessageLength(testClass.getMessageLength(message).getBytes());
testClass.sendMessage(message);
}
testClass.stopConnection();
}
private Socket clientSocket;
private PrintWriter out;
private BufferedReader in;
private OutputStream outputStream;
public String getMessageLength(final String message) {
final int firstByte = message.length() >> 8;
final int secondByte = message.length() - (firstByte << 8);
System.out.println("char 0 " + (char) firstByte + " char 1 " + (char) secondByte);
final String str = new String(new char[] { (char) firstByte, (char) secondByte });
System.out.println("firstByte :: " + firstByte + " secondByte :: " + secondByte + "Str :: " + str);
return str;
}
public void startConnection(final String ip, final int port) {
try {
this.clientSocket = new Socket(ip, port);
this.outputStream = this.clientSocket.getOutputStream();
this.out = new PrintWriter(this.outputStream, true);
this.in = new BufferedReader(new InputStreamReader(this.clientSocket.getInputStream()));
} catch (final UnknownHostException e) {
e.printStackTrace();
} catch (final IOException e) {
e.printStackTrace();
}
}
public String sendMessage(final String msg) {
this.out.println(msg);
String resp = null;
try {
resp = this.in.readLine();
System.out.println("resp :: " + resp);
} catch (final IOException e) {
e.printStackTrace();
}
return resp;
}
public void sendMessageLength(final byte[] msg) {
try {
this.outputStream.write(msg);
} catch (final IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
In my client I am first sending the length of the message as 2 bytes(see getMessageLength
) and then the actual message (see sendMessage
) on the socket output stream. Now when server receives the message it is printing below.
Output:
channel Active
ServerRequestHandler reading message :: pool-1-thread-1 message recieved :: This is a sample message but we can send actualmessage ended
exception caught
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 2562 - discarded
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:522)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:500)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.exceededFrameLength(LengthFieldBasedFrameDecoder.java:387)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:430)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
channel read complete
channel inactive
In my code I am not sending data greater than 50 bytes but not sure why the TCPserver is failing that the frame size is more than 1024. My suspect is that the length calculation is not working as expected.
I am new to Netty and have few additional questions as well.
- Does the
channelRead
method gets called in a separate thread for every message it receives? - Does the decode method gets called in blocking thread where it reads (n) of bytes from the TCP/IP stream in blocking fashion and passes the control to the requestHandler to do
channelRead
? - How to only accept connections which are created using secure socket on the server side?
- I would like to process the messages coming from single client in concurrent threads to increase the throughput of my server... do I have to do anything additional (Spawning threads within
channelRead
method?) or the NioEventLoopGroup supports this out of the box?