0
votes

Below is my TCPServer Code

public class TCPServer {

private final int port;

public TCPServer(final int port) {
    this.port = port;
}

public void run() throws Exception {
    final ExecutorService threadPool = Executors.newCachedThreadPool();
    final EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
    final EventLoopGroup workerGroup = new NioEventLoopGroup(50, threadPool);
    try {
        final ServerBootstrap b = new ServerBootstrap(); // (2)

        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
                .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                    @Override
                    public void initChannel(final SocketChannel ch) throws Exception {
                         //ch.pipeline().addLast(new ServerRequestHandler());
                        ch.pipeline().addLast(new ReqMessageDecoder(1024, 0, 2, 0, 2), new ServerRequestHandler());
                    }
                }).option(ChannelOption.SO_BACKLOG, 1000) // (5)
                .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

        // Bind and start to accept incoming connections.
        final ChannelFuture f = b.bind(this.port).sync(); // (7) // Start
                                                          // the server

        // Wait until the server socket is closed.
        // In this example, this does not happen, but you can do that to
        // gracefully
        // shut down your server.
        f.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
        threadPool.shutdown();
    }
}

public static void main(final String[] args) throws Exception {
    int port = 9090;
    if (args.length > 0) {
        port = Integer.parseInt(args[0]);
    }

    new TCPServer(port).run();
}
}

MyRequestHandler Class

public class ServerRequestHandler extends ChannelInboundHandlerAdapter {


@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) { // (2)

    log(" ServerRequestHandler reading message :: " + Thread.currentThread().getName());

    ctx.write(msg);

}
}

My Decoder Class

public class ReqMessageDecoder extends LengthFieldBasedFrameDecoder {

public ReqMessageDecoder(final int maxFrameLength, final int lengthFieldOffset, final int lengthFieldLength,
            final int lengthAdjustment, final int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }

}

I have below sample TCP client code which sends a sample message to the above server in a loop for 79 times in sequential manner.

public static void main(final String args[]) {
        int i = 80;
        final String message = "This is a sample message but we can send actual";
        final TestClass testClass = new TestClass();
        testClass.startConnection("localhost", 9090);
        while (i > 1) {
            i--;
            testClass.sendMessageLength(testClass.getMessageLength(message).getBytes());
            testClass.sendMessage(message);

        }
        testClass.stopConnection();
    }

private Socket clientSocket;
private PrintWriter out;
private BufferedReader in;
private OutputStream outputStream;

public String getMessageLength(final String message) {
    final int firstByte = message.length() >> 8;
    final int secondByte = message.length() - (firstByte << 8);
    System.out.println("char 0 " + (char) firstByte + " char 1 " + (char) secondByte);
    final String str = new String(new char[] { (char) firstByte, (char) secondByte });
    System.out.println("firstByte :: " + firstByte + " secondByte :: " + secondByte + "Str :: " + str);
    return str;
}

public void startConnection(final String ip, final int port) {
    try {
        this.clientSocket = new Socket(ip, port);
this.outputStream = this.clientSocket.getOutputStream();
        this.out = new PrintWriter(this.outputStream, true);
        this.in = new BufferedReader(new InputStreamReader(this.clientSocket.getInputStream()));
    } catch (final UnknownHostException e) {
        e.printStackTrace();
    } catch (final IOException e) {
        e.printStackTrace();
    }

}

public String sendMessage(final String msg) {
    this.out.println(msg);
    String resp = null;
    try {
        resp = this.in.readLine();
        System.out.println("resp :: " + resp);
    } catch (final IOException e) {
        e.printStackTrace();
    }
    return resp;
}

 public void sendMessageLength(final byte[] msg) {
    try {
        this.outputStream.write(msg);
    } catch (final IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

In my client I am first sending the length of the message as 2 bytes(see getMessageLength) and then the actual message (see sendMessage) on the socket output stream. Now when server receives the message it is printing below.

Output:

    channel Active
 ServerRequestHandler reading message :: pool-1-thread-1 message recieved :: This is a sample message but we can send actualmessage ended
exception caught
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 2562 - discarded
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:522)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:500)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.exceededFrameLength(LengthFieldBasedFrameDecoder.java:387)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:430)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
channel read complete
channel inactive

In my code I am not sending data greater than 50 bytes but not sure why the TCPserver is failing that the frame size is more than 1024. My suspect is that the length calculation is not working as expected.

I am new to Netty and have few additional questions as well.

  1. Does the channelRead method gets called in a separate thread for every message it receives?
  2. Does the decode method gets called in blocking thread where it reads (n) of bytes from the TCP/IP stream in blocking fashion and passes the control to the requestHandler to do channelRead?
  3. How to only accept connections which are created using secure socket on the server side?
  4. I would like to process the messages coming from single client in concurrent threads to increase the throughput of my server... do I have to do anything additional (Spawning threads within channelRead method?) or the NioEventLoopGroup supports this out of the box?
2

2 Answers

0
votes
ch.pipeline().addLast(new ReqMessageDecoder(1024, 0, 2, 0, 2), new ServerRequestHandler());

Your ReqMessageDecoder expects the maximum length of the frame to be 1024, anything over that would cause an exception. How are you defining your frames boundaries? frame boundary would tell your decoder a complete frame has been received and it can be processed by your application code, otherwise the decoder would keep on reading the incoming data till it hits the max frame size (e.g. 1024) or the frame boundary character/sequence is received.

See the following link for netty's builtin end of line based decoder that you can add to your channel DelimiterBasedFrameDecoder.

0
votes

So after so much of trial and error and going through different links I have solved this question and not my PoC is complete.

new ReqLengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, -2, 2) 
// This will read 1st 2 bytes as length and will skip those to read the next 
// bytes (i.e. length mentioned in 1st 2 bytes)

Does the channelRead method gets called in a separate thread for every message it receives?

Ans: To do this you need to do something like below.

pipeline.addLast(this.threadPool, new ServerRequestHandler());
// this.threadPool refers to UnorderedThreadPoolEventExecutor 
// ServerRequestHandler extends SimpleChannelInboundHandler<T>

see Netty 4. Parallel processing after ByteToMessageCodec

Does the decode method gets called in blocking thread where it reads (n) of bytes from the TCP/IP stream in blocking fashion and passes the control to the requestHandler to do channelRead?

Ans: Yes but these threads are different than the I/O threads. These threads work on the channel pipeline.

How to only accept connections which are created using secure socket on the server side?

Ans: see below basic setup

// some dummy SSL setup. actual production code will have some more
// sophisticated steps.
final SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
pipeline.addLast(sslCtx.newHandler(ch.alloc()));

I would like to process the messages coming from single client in concurrent threads to increase the throughput of my server... do I have to do anything additional (Spawning threads within channelRead method?) or the NioEventLoopGroup supports this out of the box?

Ans: Please refer to Netty multi threading per connection & Netty 4. Parallel processing after ByteToMessageCodec