1
votes

Custom ByteToMessageEncoder does not receive bytes sent in same tcp connection but in different tcp messages.

I have been assigned to solve a problem where a couple of years old system started misbehaving. From my part there is a tcp server written in netty by some other developers that receives binary messages with a static length header and variable length body. Body length is defined by a a header field that tells the message type. We maintain a map of messagetypes and their lengths.

The problem faced is that after correctly decoding the header and knowing the body length the same decoder is expecting body to come in the same ByteBuf (that is one fireChannelRead event from the byteChannel).

However, sometimes there is not enough stuff in the buffer, so decoder gives up. But next time the decode-method is called the body bytes show up and are wrongly interpreted as the header, thus putting the decoder out-of-sync.

What is the right way using netty to assemble messages, whose bytes might drop in in smaller chunks?

Here is basics of current decoder.

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Message message = decode(ctx, in); 
        if (message != null) {
            out.add(message);
        }
    }

    protected Message decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < MessageHeader.SIZE) {
            return null;
        }
        ByteBuf headerBytes = in.readBytes(MessageHeader.SIZE);
        MessageHeader header = MessageHeader.decode(headerBytes, new ProcessingTracker(MessagePart.HEADER));
        if (header == null) {
            ctx.disconnect().sync();
            logger.debug("Disconnected from channel");
            return null;
        }
        int bodySize = header.getMessageType().getMessageBodySize();
        if (!waitingForBytes(in, bodySize, READ_TRY_TIMES)) {
            ctx.disconnect().sync();
            logger.debug("Disconnected from channel");
            return null;
        }
        ByteBuf messageBytes = in.readBytes(bodySize);
        messageBytes.resetReaderIndex();
        Message message = Message.decode(header, messageBytes, 0);
        return message;
    }


    public boolean waitingForBytes(ByteBuf in, int bodySize, int counter) {
        if (counter == 0) {
            logger.warn("Didn't get enough bytes of message body in MessagDecoder. Giving up and disconnecting from remote peer.");
            return false;
        }
        logger.debug(String.format("Readable bytes in buffer %d, expected %d", in.readableBytes(), bodySize));

        if (in.readableBytes() < bodySize) {

            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return waitingForBytes(in, bodySize, counter - 1);
        } else {
            return true;
        }
    }
2

2 Answers

0
votes

There are multiple problems in your code...

First of you are not allowed to call sync() in your code as this way you will "deadlock" the EventLoop.

Secondly you can't use waitingForBytes here as it will basically stale all other IO on the EventLoop which means you will never continue to do any IO. In an a framework like Netty it is important to never block the EventLoop thread as this will basically cause everything to stale and not make any progress.

0
votes

Looking at the super-class ByteToMessageDecoder it is obvious that a subclass should communicate the decoding progress through the bytes it read or number of messages decoded. I think the persons I inherited this code from missed that point.

This implementation passed some initial tests:

public class MessageDecoder extends ByteToMessageDecoder {

    private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);

    private ByteBuf headBuf = Unpooled.buffer(MessageHeader.SIZE);

    private MessageHeader header = null;

    private ByteBuf bodyBuf;

    private int bodylength = 0;

    private int messageBytes = 0;

    private final static int STATE_READ_HEADER = 1, STATE_READ_BODY = 2;

    private int state = STATE_READ_HEADER;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Message message = decode(ctx, in);
        if (message != null) {
            out.add(message);
        }
    }

    protected Message decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        int readBytes = 0;
        logstate(in);
        switch (state) {
        case STATE_READ_HEADER:
            if (in.readableBytes() <= MessageHeader.SIZE - messageBytes) {
                readBytes = in.readableBytes();
            } else {
                readBytes = MessageHeader.SIZE - messageBytes;
            }
            headBuf.writeBytes(in, readBytes);
            messageBytes += readBytes;
            if (messageBytes == MessageHeader.SIZE) {
                state = STATE_READ_BODY;
                header = MessageHeader.decode(headBuf, new ProcessingTracker(MessagePart.HEADER));
                bodylength = header.getMessageType().getMessageBodySize();
                bodyBuf = Unpooled.buffer(bodylength);
            }
            break;
        case STATE_READ_BODY:
            if (in.readableBytes() <= bodylength - (messageBytes - MessageHeader.SIZE)) {
                readBytes = in.readableBytes();
            } else {
                readBytes = bodylength - (messageBytes - MessageHeader.SIZE);
            }
            bodyBuf.writeBytes(in, readBytes);
            messageBytes += readBytes;
            if (messageBytes == MessageHeader.SIZE + bodylength) {
                state = STATE_READ_HEADER;
                Message message = Message.decode(header, bodyBuf, 0);
                reset();
                return message;
            }
            break;
        }
        return null;
    }
}