4
votes

The code I'm analyzing creates UDP server with Netty NioDatagramChannelFactory. It creates a thread pool with:

ExecutorService threadPool = Executors.newCachedThreadPool();

Then the datagram channel, pipelineFactory & bootstrap:

int workerCount = 10;
DatagramChannelFactory datagramChannelFactory = new NioDatagramChannelFactory(threadPool, workerCount);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory();

ConnectionlessBootstrap bootStrap = new ConnectionlessBootstrap(datagramChannelFactory);
bootStrap.setPipelineFactory(pipelineFactory);
bootStrap.bind(new InetSocketAddress(host, port));

In the pipelineFactory, the getPipeline() adds the custom handlers.

Just like it is said in: Multi-threaded Handling of UDP Messages

There is only one thread processing the received messages. In the logs, the thread names appears as New I/O datagram worker #1 like:

2012-04-20 09:20:51,853 New I/O datagram worker #1'-'1 INFO [c.e.m.r.s.h.SNMPTrapsRequestHandler:42] messageReceived | Processing: V1TRAP[reqestID=0, ...]

I read the documentation and this entry: Lot of UDP requests lost in UDP server with Netty

And then I changed a bit the code according to those entries. Now the thread pool is created with:

int corePoolSize = 5;
ExecutorService threadPool = new OrderedMemoryAwareThreadPoolExecutor(corePoolSize, 1048576, 1048576);

And the pipelineFactory with and ExecutionHandler:

ExecutionHandler executionHandler = new ExecutionHandler(threadPool);
ChannelPipelineFactory pipelineFactory = new SNMPTrapsPipeLineFactory(executionHandler);

And the getPipeline() adds the handler like described:

public class SNMPTrapsPipeLineFactory implements ChannelPipelineFactory {

    private ExecutionHandler executionHandler = null;

    public SNMPTrapsPipeLineFactory(ExecutionHandler executionHandler) { 
        this.executionHandler = executionHandler;
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addFirst("ExecutorHandler", executionHandler);

        // Here the custom handlers are added
        pipeline.addLast( ... )
    }

Now, I get 4 different thread names in the logs. They appears as pool-2-thread-1, pool-2-thread-2, etc...

For example:

2012-05-09 09:12:19,589 pool-2-thread-1 INFO [c.e.m.r.s.h.SNMPTrapsRequestHandler:46] messageReceived | Processing: V1TRAP[reqestID=0, ...]

But they are not processed concurrently. The processing under messageReceived() has to finish on one thread for the next to process the next message. I sent a buch of messages from different clients to the server, and the logs I get are not interlaced. I also tried to Thread.sleep() inside messageReceived(), and confirms the previous.

Am I missing something? Is there a way to achieve a REAL multi-threaded UDP server with Netty? How can I get different threads to execute messageReceived() concurrently?

2
If am am not mistaken, then OrderedMemoryAwareThreadPoolExecutor execute requests from the same client in the same thread.kofemann

2 Answers

1
votes

Based on my experience and my understanding of Netty with UDP, it is normal that there is only one thread that handles the UDP messages for decoding. Since UDP is session-less, only one thread can receive the data on one UDP port and decode it.

Once you have decoded your data and wrapped it into a buffer or a specific java object, then you can put that object into a pool of threads that will handle it (execution handler -> your business handler). Then new upcoming data on the UDP port can then be decoded once you released the previous decoded data into the execution handler.

The thread of pools you can specify when creating the NioDatagramChannelFactory is used only when you listen for data on more than one port. Only one thread per port make sense. Even if you specify 100 workers in that constructor, only one will be used if you configured one UDP port.

0
votes

One thing that jumped out at me is that you put your execution handler first in the pipeline. I believe the intent is that the entire pipeline up to the "application" handler should be executed by the IO threads performing the IO and the decoding.

Therefore, I would assert that you would want to add all of your SNMPTrap decoding handlers first, and then, when you have an actual SNMPTrap, it gets handed off to the execution handler which in turn passes the trap to the actual consumers of the traps to do something useful with.

@Override
public ChannelPipeline getPipeline() throws Exception {

    ChannelPipeline pipeline = Channels.pipeline(
         new SomethingSomethingDecoder(),
         new SNMPTrapDecoder(),
         executionHandler.
         snmpTrapConsumerHandler
    );
}

At least, that's how it is shown in the ExecutionHandler javadoc, and the above is my interpretation of it.