2
votes

Below is the code of my netty server. It is configured to release reference count on channelRead i.e wont be processing anything just drop the incoming data. Client is also netty based. Which starts 16 parallel connections with server and start sending data on each channel. However as soon as program starts, memory usage keep increasing and eventually it crashes with following exception.

08:41:15.789 [nioEventLoopGroup-3-1] WARN  i.n.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached a
t the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 100663296 byte(s) of direct memory (used: 3602907136, max: 369885184
0)
    at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:754) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:260) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:231) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.PoolArena.reallocate(PoolArena.java:397) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:118) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:285) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:265) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1079) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1072) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1062) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263) ~[sosagent.jar:1.0-SNAPSHOT]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [sosagent.jar:1.0-
SNAPSHOT]

NettyServerHandler

public class AgentServerHandler extends ChannelInboundHandlerAdapter implements RequestListener {

        private Buffer buffer;
        private AgentToHost endHostHandler;
        private String remoteAgentIP;
        private int remoteAgentPort;

        private ChannelHandlerContext context;
        private float totalBytes;
        private long startTime;

        boolean called;

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
            log.debug("New agent-side connection from agent {} at Port {}",
                    socketAddress.getHostName(),
                    socketAddress.getPort());

            this.context = ctx;

            remoteAgentIP = socketAddress.getHostName();
            remoteAgentPort = socketAddress.getPort();

            requestListenerInitiator.addRequestListener(this);
            if (this == null ) log.info("EHy nULLL ");
        //    Utils.router.getContext().getAttributes().put("agent-callback", requestListenerInitiator);

            StatCollector.getStatCollector().connectionAdded();

            startTime = System.currentTimeMillis();


        }

        private boolean isMineChannel(RequestTemplateWrapper request, AgentServerHandler handler) {
         //   if (handler == null) log.info("nULLLL"); else log.info("not null");
            return request.getPorts().contains(((InetSocketAddress) handler.context.channel().remoteAddress()).getPort());
        }


        /*  Whenever AgentServer receives new port request from AgentClient.
        This method will be called and all the open channels
                    will be notified.                */
        @Override
        public void newIncomingRequest(RequestTemplateWrapper request) {
            endHostHandler = getHostHandler(request);
                if (isMineChannel(request, this)) {
                    endHostHandler.addChannel(this.context.channel());
                    log.debug("Channel added for Client {}:{} Agent Port {}",
                            request.getRequest().getClientIP(),
                            request.getRequest().getClientPort(),
                            (((InetSocketAddress) this.context.channel().remoteAddress())).getPort());

                    this.buffer = bufferManager.addBuffer(request, endHostHandler);

            }
            endHostHandler.setBuffer(buffer);

        }


        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ReferenceCountUtil.release(msg);
            totalBytes += ((ByteBuf) msg).capacity();
        }

}

Bootstrap

private boolean startSocket(int port) {
    group = new NioEventLoopGroup();
    AgentTrafficShaping ats = new AgentTrafficShaping(group, 5000);
    ats.setStatListener(this);
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(group)
                .channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(port))
                .childHandler(new ChannelInitializer() {
                                  @Override
                                  protected void initChannel(Channel channel) throws Exception {
                                      channel.pipeline()
                                              .addLast("agent-traffic-shapping", ats)
                                              .addLast("lengthdecorder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
                                              // .addLast("bytesDecoder", new ByteArrayDecoder())
                                              .addLast(new AgentServerHandler())
                                              .addLast("4blength", new LengthFieldPrepender(4))
                                      //  .addLast("bytesEncoder", new ByteArrayEncoder())
                                      ;
                                  }
                              }
                );

        ChannelFuture f = b.bind().sync();
        log.info("Started agent-side server at Port {}", port);
        return true;
        // Need to do socket closing handling. close all the remaining open sockets
        //System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
        //f.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.error("Error starting agent-side server");
        e.printStackTrace();
        return false;
    } finally {
        //group.shutdownGracefully().sync();
    }
}

What could be possible cause here. I know netty uses reference count to keep track of Buffers. I am just releasing the reference as soon as I get a message so that shouldn't be problem !

1
Have you got solution?Kishan Solanki

1 Answers

1
votes

There might be different reasons for OOM exception. One reason readily comes to my mind is is setting AUTO_READ option on the channel. The default value is true.

you can get more information about this in stack overflow posts here and here

If setting AUTO_READ option doesn't help, netty provides an option to check if any message to ChannelHandler is not released. Please set -Dio.netty.leakDetectionLevel=ADVANCED JVM option in the system properties.