Custom ByteToMessageEncoder does not receive bytes sent in same tcp connection but in different tcp messages.
I have been assigned to solve a problem where a couple of years old system started misbehaving. From my part there is a tcp server written in netty by some other developers that receives binary messages with a static length header and variable length body. Body length is defined by a a header field that tells the message type. We maintain a map of messagetypes and their lengths.
The problem faced is that after correctly decoding the header and knowing the body length the same decoder is expecting body to come in the same ByteBuf (that is one fireChannelRead event from the byteChannel).
However, sometimes there is not enough stuff in the buffer, so decoder gives up. But next time the decode-method is called the body bytes show up and are wrongly interpreted as the header, thus putting the decoder out-of-sync.
What is the right way using netty to assemble messages, whose bytes might drop in in smaller chunks?
Here is basics of current decoder.
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Message message = decode(ctx, in);
if (message != null) {
out.add(message);
}
}
protected Message decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < MessageHeader.SIZE) {
return null;
}
ByteBuf headerBytes = in.readBytes(MessageHeader.SIZE);
MessageHeader header = MessageHeader.decode(headerBytes, new ProcessingTracker(MessagePart.HEADER));
if (header == null) {
ctx.disconnect().sync();
logger.debug("Disconnected from channel");
return null;
}
int bodySize = header.getMessageType().getMessageBodySize();
if (!waitingForBytes(in, bodySize, READ_TRY_TIMES)) {
ctx.disconnect().sync();
logger.debug("Disconnected from channel");
return null;
}
ByteBuf messageBytes = in.readBytes(bodySize);
messageBytes.resetReaderIndex();
Message message = Message.decode(header, messageBytes, 0);
return message;
}
public boolean waitingForBytes(ByteBuf in, int bodySize, int counter) {
if (counter == 0) {
logger.warn("Didn't get enough bytes of message body in MessagDecoder. Giving up and disconnecting from remote peer.");
return false;
}
logger.debug(String.format("Readable bytes in buffer %d, expected %d", in.readableBytes(), bodySize));
if (in.readableBytes() < bodySize) {
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return waitingForBytes(in, bodySize, counter - 1);
} else {
return true;
}
}