2
votes

I am encountering a very weird issue with my program which is using Netty.

I am listening to a port and parsing the messages (using FrameDecoder implementation). Everything is working fine if I am receiving one connection, but when I am receiving two connections on the same port (each one from a different server), I encounter a rare but critical situation where I get corruptedFrameException

The issue occurs when my program receives TCP packets with the exact same timestamp (when sending information at a very high rate) as follows

  1. TCP Packet from Server 1
  2. TCP Packet from Server 2
  3. TCP Packet from Server 1 (which is a continutation of the message started in bullet 1)
  4. TCP Packet from Server 2 (which is a continuation of the message starged in bullet 2)

My program tries to parse 1 and 2 as a message, instead of knowing that the actual message is 1 & 3 and 2 & 4 I read somewhere that maybe I need to instantiate a new FrameDecoder for every channel connection, but I don't know how exactly to do that. I am adding the decoder to the pipleline at startup and I cannot figure out how to add a new one to a specific channel

The exceptions I am experiencing are:

org.jboss.netty.handler.codec.frame.CorruptedFrameException: Adjusted frame length (0) is less than lengthFieldEndOffset: 2
    at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:340) ~[netty-3.1.5.GA.jar:]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282) ~[netty-3.1.5.GA.jar:]
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:214) ~[netty-3.1.5.GA.jar:]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:345) ~[netty-3.1.5.GA.jar:]
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:332) ~[netty-3.1.5.GA.jar:]
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:323) ~[netty-3.1.5.GA.jar:]
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:275) ~[netty-3.1.5.GA.jar:]
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:196) ~[netty-3.1.5.GA.jar:]
    at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46) ~[netty-3.1.5.GA.jar:]
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:651) [na:1.5.0]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:676) [na:1.5.0]
    at java.lang.Thread.run(Thread.java:595) [na:1.5.0]

and

org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 4096: 8224
        at org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:296) ~[netty-3.1.5.GA.jar:]
        at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282) ~[netty-3.1.5.GA.jar:]
        at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216) ~[netty-3.1.5.GA.jar:]
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:345) ~[netty-3.1.5.GA.jar:]
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:332) ~[netty-3.1.5.GA.jar:]
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:323) ~[netty-3.1.5.GA.jar:]
        at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:275) ~[netty-3.1.5.GA.jar:]
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:196) ~[netty-3.1.5.GA.jar:]
        at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46) ~[netty-3.1.5.GA.jar:]
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:651) [na:1.5.0]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:676) [na:1.5.0]
        at java.lang.Thread.run(Thread.java:595) [na:1.5.0]
2
"maybe I need to instantiate a new FrameDecoder for every channel". No maybe about it. That's exactly what you need to do.user207421
I figured, but how do I do it? I am adding the decoded on the ChannelPipeLine object.addLast("decoder", decoderObject) upon initialization, but when a new channel is connecting I am at the point of channelConnected(ChannelHandlerContext ctx, ChannelStateEvent t) ... If I instantiate a new decoder, how do I assign it to the channel? Something is missing in the puzzle :)ArnonSe
That's the channelpipelinefactory's job. Define one that creates your decoder and then either use a bootstrap (with the channel factory) or pass the pipeline factory to the channel factory when requesting a new channel.Nicholas

2 Answers

1
votes

You need to add a new object of frame decoder to your new(second?) channel. Duplicate exception is because you are using the same channel.

  1. The server bootstrap takes a pipeline factory which will configure all new channels with the appropriate encoders and decoders.
  2. Whenever a remote client connects to your server it creates it own channel object.
  3. In the Channel PipelineFactory implementation you are using. Make sure that new instance of frame decoder is created each time pipeline() method is invoked.
  4. The @Sharable annotation is for documentation purpose only. Even if you use it, it will not affect anything, since it is not code/logic. Just a marker that is all.
0
votes

You need to "NOT share" the FrameDecoder between channels. FrameDecoder is not not annotated with @Sharable so you can't share it. Thats the cause of your Problem here..