3
votes

I'm having troubles getting netty to work with UDP. The biggest problem is that once I make a connection to the server and finish doing the interactions between the server and client the server becomes useless. I can't make any other connection to it from the same client or any other (different host). I feel like their is something really simple and easy I'm missing. I have configured the server to create a new pipeline (I think?) for each new host that connects to it with the following code:

public class DistinctChannelPipelineFactory implements ChannelPipelineFactory {

  private final ChannelPipelineFactory pipelineFactory;

  public DistinctChannelPipelineFactory(ChannelPipelineFactory pipelineFactory) {
    this.pipelineFactory = pipelineFactory;
  }

  @Override public ChannelPipeline getPipeline() throws Exception {
    return Channels.pipeline(new DistinctChannelPipelineHandler(pipelineFactory));
  }

}

With DistinctChannelPipelineHandler looking this, where I try I to make a different pipeline per remote host and timing them out after 10 seconds.

    private final LoadingCache<SocketAddress, ChannelPipeline> pipelines;

  public DistinctChannelPipelineHandler(ChannelPipelineFactory factory) {
    this.pipelines = CacheBuilder.newBuilder()
        .concurrencyLevel(1)
        .expireAfterAccess(10, SECONDS)
        .removalListener(new PipelineRemovalListener())
        .build(new PipelineCacheLoader(factory));
  }

  public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
    if (e instanceof MessageEvent) {
      final ChannelPipeline pipeline = pipelines.get(((MessageEvent) e).getRemoteAddress());
      if (!pipeline.isAttached()) {
        pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink());
        pipeline.sendUpstream(new UpstreamChannelStateEvent(ctx.getChannel(), OPEN, TRUE));
      }
      pipeline.sendUpstream(e);
    }

    if (e instanceof ChannelStateEvent) {
      for (final ChannelPipeline pipeline : pipelines.asMap().values()) {
        final ChannelStateEvent cse = (ChannelStateEvent) e;
        pipeline.sendUpstream(new UpstreamChannelStateEvent(ctx.getChannel(), cse.getState(), cse.getValue()));
      }
    }
  }

  public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
    if (e instanceof MessageEvent) {
      final ChannelPipeline pipeline = pipelines.get(((MessageEvent) e).getRemoteAddress());
      if (!pipeline.isAttached()) {
        pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink());
      }
      pipeline.sendDownstream(e);
    } else {
      ctx.sendDownstream(e);
    }
  }

  private static final class PipelineCacheLoader extends CacheLoader<SocketAddress, ChannelPipeline> {

    private final ChannelPipelineFactory factory;

    public PipelineCacheLoader(ChannelPipelineFactory factory) {
      this.factory = factory;
    }

    @Override
    public ChannelPipeline load(SocketAddress key) throws Exception {
      return factory.getPipeline();
    }
  }

  private static final class PipelineRemovalListener implements RemovalListener<SocketAddress, ChannelPipeline> {

    private static final Logger logger = LoggerFactory.getLogger(PipelineRemovalListener.class);

    @Override
    public void onRemoval(RemovalNotification<SocketAddress, ChannelPipeline> n) {
      logger.info("UDP connection timed out, removing connection for {}", n.getKey());
      n.getValue().sendUpstream(new UpstreamChannelStateEvent(n.getValue().getChannel(), OPEN, FALSE));
    }
  }

This is how I'm initializing the server:

@Provides
  public ConnectionlessBootstrap getConnectionlessBootstrap(DatagramChannelFactory channelFactory,
                                                            @LocalAddress SocketAddress localAddress,
                                                            final UdpPipelineFactory pipelineFactory) {

    final ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(channelFactory);
    bootstrap.setOption("localAddress", localAddress);
    bootstrap.setPipelineFactory(new DistinctChannelPipelineFactory(pipelineFactory));
    return bootstrap;
  }

@Provides
  @Singleton
  public DatagramChannelFactory getDatagramChannelFatory(@WorkerExecutor Executor worker) {
    final DatagramChannelFactory channelFactory = new NioDatagramChannelFactory(worker);
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override public void run() {
        channelFactory.releaseExternalResources();
      }
    });
    return channelFactory;
  }

I have omitted where I actually add all my handlers as I didn't think thats where the problem lies. Am I missing something fundamental here? I just want a pipeline per unique remote address that times out. It's awfully frustrating firing up the server and having it literally work for only client/server interaction only! I have verified through debugging that once I hit it with additional requests it does NOT create a new pipeline. So it seems like the original pipeline is staying around in a very stale state which is why it won't accept any other requests. Thoughts? Suggestions?

1

1 Answers

2
votes

Made a fundamental mistake. With ConnectionlessBootstrap everything runs over the same channel and we were closing the channel after each call to the server...thus disabling UDP. That's what our TCP code does and it took a while to realize it works differently. Hope someone else saves some time and headaches off of this.