1
votes

I'm new to Netty and I wrote based on an example I found a Netty http server, that keeps http connections open to send server-sent-events to the browser client.

Problem is that it only accepts up to about ~5 connections and after that blocks new connections. I googled and found most answers said to set SO_LOGBACK to a higher value. Tried different values and while I saw no difference. I even set it to MAX_INTEGER value and still had only 5 connections.

Server code (Using Netty version 4.1.6.Final):

package server;

import static io.netty.buffer.Unpooled.copiedBuffer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;

public class NettyHttpServer {
 private ChannelFuture channel;
 private final EventLoopGroup masterGroup;

 public NettyHttpServer() {
  masterGroup = new NioEventLoopGroup(100);
 }

 public void start() {
  try {
   final ServerBootstrap bootstrap = new ServerBootstrap().group(masterGroup)
    .channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer < SocketChannel > () {
     @Override
     public void initChannel(final SocketChannel ch) throws Exception {
      ch.pipeline().addLast("codec", new HttpServerCodec());
      ch.pipeline().addLast("aggregator", new HttpObjectAggregator(512 * 1024));
      ch.pipeline().addLast("request", new ChannelInboundHandlerAdapter() {
       @Override
       public void channelRead(final ChannelHandlerContext ctx, final Object msg)
       throws Exception {
        System.out.println(msg);
        registerToPubSub(ctx, msg);
       }

       @Override
       public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
       }

       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
         HttpResponseStatus.INTERNAL_SERVER_ERROR,
         copiedBuffer(cause.getMessage().getBytes())));
       }
      });
     }
    }).option(ChannelOption.SO_BACKLOG, Integer.MAX_VALUE)
    .childOption(ChannelOption.SO_KEEPALIVE, true);
   channel = bootstrap.bind(8081).sync();
   // channels.add(bootstrap.bind(8080).sync());
  } catch (final InterruptedException e) {}
 }

 public void shutdown() {
  masterGroup.shutdownGracefully();

  try {
   channel.channel().closeFuture().sync();
  } catch (InterruptedException e) {}
 }

 private void registerToPubSub(final ChannelHandlerContext ctx, Object msg) {
  new Thread() {
   @Override
   public void run() {
    while (true) {
     final String responseMessage = "data:abcdef\n\n";
     FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
      copiedBuffer(responseMessage.getBytes()));

     response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
     response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/event-stream");
     response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
     response.headers().set("Cache-Control", "no-cache");

     ctx.writeAndFlush(response);

     try {
      Thread.sleep(1000);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   };
  }.start();
 }

 public static void main(String[] args) {
  new NettyHttpServer().start();
 }
}

Client js code (I run it more than 5 times from my browser in different tabs, and the not all of them get:

var source = new EventSource("http://localhost:8081");
source.onmessage = function(event) {
   console.log(event.data);
};
source.onerror= function(err){console.log(err); source.close()};
source.onopen = function(event){console.log('open'); console.log(event)}
1

1 Answers

2
votes

You need to let the browser know that you are done sending the response, and for that you have three options.

  1. Set a content length
  2. Send it chunked
  3. Close the connection when you are done

You aren't doing any of those. I suspect your browser is still waiting for the full response to each request you send, and is using a new connection for each request in your testing. After 5 requests your browser must be refusing to create new connections.

Another thing I noticed is that you are creating a new thread for each request in your server, and never letting it die. That will cause problems down the line as you try to scale. If you really want that code to run in a different thread then I suggest looking at overloaded methods for adding handlers to the pipeline; those should let you specify a thread pool to run them in.