1
votes

I have a server listening for data on a UDP port and I am using netty. My server has its own frame decoder, an execution handler, and an business-specific handler. My test client is made with java DatagramSocket (not using netty) and simply sends data before listening for a reply. See examples below.

In my first version I was not sending back any reply in my server and I was able to run thousand of concurrent clients sending requests in repetition without any problem. My server was handling those hundred of thousands of requests perfectly.

Then I had to add a reply for each request received on the server-side in my business-specific handler. I simply followed the examples provided on the netty side, i.e. writting my data in the event buffer. On the client side, I reused the same udp socket that sent the data to receive the response. The mechanism worked very well with one client sending hundred of consecutive requests to my server which answered back properly of each one. However, at the moment my client shutdown (than closing the socket), my server stopped to accept any other request. I had to restart my server to run that process again. Of course, when running more than one concurrent client, only one works well and only one time.

I used wireshark to analyse the data sent and received. All the data is properly sent to the server and the server properly returns the reply on the first run of the client. But at the moment I stop my client to try to re-run it again, my server stops to handle the data. With wireshark, I can see that the data is really sent up to the server, it is really the server itself that stops to handler the data.

Can someone tells me what I did wrong in that example?

Here is a simple version of my client:

public class UDPClientTester
{

    public void sendAndReceiveClientTester() throws IOException
    {
        final InetAddress lIpAddress = InetAddress.getByName( "localhost" );
        int lPort = 50000;
        int lNbrRepeat = 1;

        byte[] lDataToSendAsBytes = {1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0}; //new byte[20];
        byte[] lReceivedData = new byte[22];

        final DatagramSocket udpSocket = new DatagramSocket();

        DatagramPacket lSendPacket = new DatagramPacket( lDataToSendAsBytes, lDataToSendAsBytes.length, lIpAddress, lPort );
        DatagramPacket lReceivePacket = new DatagramPacket( lReceivedData, 22 );

        // No metter the number of repeats, the server will accept all requests and return a reply for each one.
        for ( byte k = 0; k < lNbrRepeat; k++ )
        {
            // Send data...
            udpSocket.send( lSendPacket );

            // Receive response... Block here until a response is received.
            udpSocket.receive( lReceivePacket );            
        }

        udpSocket.close();

        // At the moment I close the socket, the server stop to accept anymore requests. 
        // I cannot run this example again until I restart the server!
    }
}

And here is a simple version of my server, showing only the main() and the handler (not showing the decoder):

public class TesterChannelHandler extends SimpleChannelHandler
{
    public  void main(final String[] args) throws Exception
    {
       DatagramChannelFactory f = new NioDatagramChannelFactory( Executors.newCachedThreadPool() );

       ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);

       // Configure the pipeline factory.
       b.setPipelineFactory(new ChannelPipelineFactory() {

           ExecutionHandler executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576) )

           public ChannelPipeline getPipeline() throws Exception {
              return Channels.pipeline(
                      new OuterFrameDecoder( null ),
                      executionHandler,
                      new TesterChannelHandler() );
          }
      });

      // Enable broadcast
      b.setOption( "broadcast", "false" );
      b.setOption( "receiveBufferSizePredictorFactory",
                   new FixedReceiveBufferSizePredictorFactory(1024) );

      // Bind to the port and start the service.
      b.bind( new InetSocketAddress(50000) );

    }


    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
    {            
        Channel lChannel = e.getChannel();
        SocketAddress lRemoteAddress = e.getRemoteAddress();

        byte[] replyFrame = {0,9,8,7,6,5,4,3,2,1};

        ChannelBuffer lReceivedBuffer = (ChannelBuffer) e.getMessage();
        ChannelBuffer lReplyBuffer = ChannelBuffers.wrappedBuffer( replyFrame );

        if ( lChannel != null && lChannel.isWritable()  && lRemoteAddress != null )
        {
            // OK, I FOUND THE ERROR!! I DO NOT NEED TO CONNECT THE CHANNEL! I MUST USE THE REMOTE SOCKET ADDRESS ATTACHED TO THE EVENT AND PASS IT TO THE WRITE METHOD BELOW.
            if ( !e.getChannel().isConnected() ) e.getChannel().connect( e.getRemoteAddress() );

            // BELOW I SHOULD PASS THE REMOTE SOCKET ADDRESS ATTACHED TO THE EVENT AS THE SECOND ARGUMENT OF THE WRITE METHOD.
            e.getChannel().write( lReplyBuffer );
        }
    }
}
1

1 Answers

0
votes

I suspect your decoder has some problem so that it store the received data into its internal buffer without finishing decoding. To confirm this, I would add a LoggingHandler to the beginning of the pipeline to log what really has been received (or not). If the LoggingHandler logs that something was received but TesterChannelHandler is not getting the message, it's likely that the decoder is buggy.

Also, please note that your decoder doesn't need to be a FrameDecoder because UDP is a message oriented protocol.