3
votes

I am using the EmbeddedChannel to test my handlers and codecs to process messages in the following format:

 +------------+------------------+----------------+      
 |   Header   |  Payload Length  |    Payload     |
 |  16 bytes  |     2 bytes      |  "Some data"   |     
 +------------+------------------+----------------+

Firstly, what I want to achieve:

  1. Process first 16 bytes by creating an object to store the header details in and add the decoded header object to the AttributeMap of the ChannelHandlerContext for later use;
  2. Wait for/retrieve entire payload data;
  3. Have Header object and entire payload as ByteBuf available on final handler to route message.

I using the following handlers:

  1. ByteToMessageCodec<ByteBuf> to extract the header info and add it to the attribute list.
  2. LengthFieldBasedFrameDecoder to read the payload length and wait for/retrieve entire frame.
  3. SimpleChannelInboundHandler that will use the header object retrieved from attribute list to route the payload accordingly.

When a message is passed to the decode method of the ByteToMessageCodec, the header is processed and extracted correctly. I then continue to add the Header object to the AttributeMap and add the ByteBuf (which has a readableBytes = 2 bytes (payload lenght indicator) + payload length).

Let's say the payload length is 1020 bytes. The message is initially received by the codec will have readableBytes = 16 bytes + 2 bytes + 1020 bytes. The header is read by the decode method and the remainder of the available bytes (1022) is then added to the List<Object> out.

If my understanding is correct, the remainder of the bytes will now be passed to the next handler which is LengthFieldBasedFrameDecoder which will read the length indicator and pass the payload (1020 bytes) to the SimpleChannelHanlder, but I must be mistaken.

The decode method is called again, with the same 1022 bytes that was added to the List<Object> out.

In the JavaDoc of the decode method there is the following:

Decode the from one ByteBuf to an other. This method will be called till either the input ByteBuf
has nothing to read when return from this method or till nothing was read from the input ByteBuf.

Does this mean decode will be called until readableBytes == 0?

What will be the most efficient way to pass the rest of the message to the LengthFieldBasedFrameDecoder?

I assume the LengthFieldBasedFrameDecoder needs a ByteBuf as input, so does this mean I need to set the readerIndex = 0 and add a copy of the ByteBuf to the List<Object> out?

Any help/advice/criticism will be appreciated, I want to do this in the cleanest way possible.

Here is my decode method:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    byte [] headerBytes = new byte[HEADER_LENGTH];
    in.readBytes(headerBytes, 0, HEADER_LENGTH);

    Header header = new Header(headerBytes);
    System.out.println("Decoded Header: \n" + header);

    //Set the header attribute so it can be used by routing handlers
    ctx.attr(ChannelAttributes.HEADER).getAndSet(header);
    //pass to next handler
    out.add(in);
}

NOTE: I'm reading Netty in Action MEAP v8

2

2 Answers

3
votes

Does this mean decode will be called until readableBytes == 0?

Basically, yes. A simplified view of a ByteToMessageDecoder looks like this

while (in.isReadable()) {
    int outputSizeBefore = out.size();
    int readableBytesBefore = in.readableBytes();

    callYourDecodeImpl(ctx, in, out);

    int outputSizeAfter = out.size();
    int readableBytesAfter = in.readableBytes();

    boolean didNotDecodeAnything = outputSizeBefore == outputSizeAfter;
    boolean didNotReadAnything = readableBytesBefore == readableBytesAfter;

    if(didNotDecodeAnything && didNotReadAnything) {
        break;
    }

    // next iteration, continue with decoding
}

So, your decoder will continuously read headers until the input buffer is exhausted.

To get the behaviour you want, you have to set the isSingleDecode flag to true:

class MyDecoder extends ByteToMessageDecoder {

    MyDecoder() {
        setSingleDecode(true);
    }

    // your decode impl as before
}

or

MyDecoder decoder = new MyDecoder();
decoder.setSingleDecode(true);

This will stop the loop after your decode implementation decoded something. Now your LengthFieldBasedFrameDecoder will be called with the ByteBuf you added to the out list. The frame decoding works as you described, no need to add a copy to the list. Your SimpleChannelInboundHandler will be called with the payload frame as the msg.

However, you won't be able to read the Header from the AttributeMap in your SimpleChannelInboundHandler since the ChannelHandlerContext is a different one for each channel handler, the atrributes aren't shared.

One way to solve this is to use an event for this. In your decoder, instead of adding the Header to the AttributeMap, send it as an event:

// instead of
// ctx.attr(Header.ATTRIBUTE_KEY).getAndSet(header);
// do this
ctx.fireUserEventTriggered(ChannelAttributes.HEADER);

Then, write your SimpleChannelInboundHandler like so

class MyMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private Header header = null;

    MyMessageHandler() {
          super(true);
    }

    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
        if (evt instanceof Header) {
            header = (Header) evt;
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
        if (header != null) {
            System.out.println("header = " + header);
            // continue with header, such as routing...
        }
        header = null;
    }
}

An alternative would be to send both objects down the pipeline and use a ChannelInboundHandlerAdapter instead of a SimpleChannelInboundHandler. In your decoder, instead of adding the Header to the AttributeMap, add it to out:

// ...
out.add(header);
out.add(in);

Then, write your ChannelInboundHandler like so

class MyMessageHandler extends ChannelInboundHandlerAdapter {
    private Header header = null;

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        if (msg instanceof Header) {
            header = (Header) msg;
            System.out.println("got the header " + header);
        } else if (msg instanceof ByteBuf) {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("got the message " + msg);
            try {
                // continue with header, such as routing...
            } finally {
                ReferenceCountUtil.release(msg);
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }
}

The LengthFieldBasedFrameDecoder simply ignores messages, that aren't ByteBufs, so your Header will just pass it (given that it does not implement ByteBuf) and arrive at your ChannelInboundHandler. Then, the message will be decoded into the payload frame and passed to your ChannelInboundHandler.

0
votes

As a follow up to knutwalker's answer: I found an alternative way for those who use ByteToMessageCodec that can't implement the setSingleDecode method.

Read out the bytes by in.readRetainedSlice() like following.

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    byte [] headerBytes = new byte[HEADER_LENGTH];
    in.readBytes(headerBytes, 0, HEADER_LENGTH);

    Header header = new Header(headerBytes);
    System.out.println("Decoded Header: \n" + header);

    //Set the header attribute so it can be used by routing handlers
    ctx.attr(ChannelAttributes.HEADER).getAndSet(header);
    //pass to next handler
    int length = in.readShort();
    out.add(in.readRetainedSlice(length));
}

Ian2thedv concerned about the efficency of the copying of bytes, but it's inescapable when the readableBytes is more than your message length, you can't just out.add(in).