2
votes

I'm quite new with netty, I want to create a TCP server which does a custom application layer handshaking when a connection is to be instantiated. After the handshaking I want to pass the messages (ByteBuf) to a queue so that they could be processed by some other threads.

My question is, can I have multiple ChannelInboundHandlerAdapter's in the channel pipeline? one for the application layer handshaking protocol and the other one for passing the message to the queue. Furthermore I want to know how the messages flow through the pipeline. If a message is received at one handler (or decoder/encoder) how is it passed to another handler.

Specifically, if I change the EchoServer from here and add another ChannelInboundHandlerAdapter, the echo server handler would stop receiving any messages.

ServerBootstrap b = new ServerBootstrap();
            b.group(group)
             .channel(NioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                     @Override
                     public void channelRead(ChannelHandlerContext ctx,
                          Object msg) {
                           }
                          });

                  ch.pipeline().addLast(
                       new EchoServerHandler());
                 }
                 });

My logic is: have 2 ChannelInboundHandlerAdapter's then do the handshaking with the first handler and discard packets if they do not match the handshaking criteria, and then pass the messages to a queue through the second ChannelInboundHandlerAdapter. Is my logic correct? If not how should it be?

Thank you very much.

2

2 Answers

4
votes

ChannelInboundHandlerAdapter is an adapter class to the ChannelInBoundHandler interface. For beginning you can use SimpleChannelInboundHandler (or more complicated you can extend the adapter class writing your own handler that extends ChannelInboundHandlerAdapter ). The SimpleCHannelInboundHandler releases the message automatically after channelRead() (and thereby passes it to the next handler in the ChannelPipeline).

For using the easier SimpleChannelInboundHandler see this thread Netty hello world example not working

So instead of this ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {}

you have to write a new class that extends SimpleChannelInboundHandler like

public class MyHandler extends SimpleChannelInboundHandler{


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {


        try {
            System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII));
        } finally {
            in.release();
        }


    }
}

and invoke it like

public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MyHandler());
                    }

As said above the SimpleCHannelInboundHandler releases the message automatically after channelRead() (and thereby passes it to the next handler in the ChannelPipeline).

If you use ChannelInboundHandlerAdapter you have to implement the passing of the message/event to the next handler yourself

A handler has to invoke the event propagation methods in ChannelHandlerContext ctx to forward an event to its next handler. (in the SimpleChannelInboundHandler class this is implemented yet)

 public class MyInboundHandler extends ChannelInboundHandlerAdapter {
   @Override
   public void channelActive(ChannelHandlerContext ctx) {
       System.out.println("Connected!");
       ctx.fireChannelActive();
    }
 }

See this http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html

-1
votes

I must remind that:

Only One SimpleChannelInboundHandler extention can be add to the pipeline chain. Because SimpleChannelInboundHandler have a finally code block will release all the msg.

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            //release all handled messages,so the next handler won't be executed
            ReferenceCountUtil.release(msg);**
        }
    }
}

Use ChannelInboundHandlerAdapter instead:

public class CustomizeChannelInboundHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("do something you like!")

    super.channelRead(ctx, msg);
  }

}