3
votes

EDIT: created github repo: https://github.com/istiillyes/client-server-netty

I've created a client-server using netty 4.0.15.Final and performed some tests using both OIO and NIO.

I'm sending some Strings, with varying sizes [1KB, 10KB, 100KB].

I need the server and client to be able to send messsages in parallel, so the test looks like this:

  1. Start server (create channel to accept connections)
  2. Start client (create channel that connects to server)
  3. Send 100 messages from client to server (and vice versa), when channel becomes active.

Using NIO, the messages are transsmitted, and everything works fine.

Using OIO, both server and client remains blocked in java.net.SocketOutputStream.wirte(byte[]) after some time, and never return.

Any idea why this happens? Is there something wrong in how I'm using netty?

I did this same test using plain java sockets, and it worked. So, I'm guessing either I don't use netty properly or this is a bug.

I added here the code where I create the channels and the channel handlers.

This is the stack trace from client, captured using YourKit:

pool-1-thread-1 [RUNNABLE, IN_NATIVE]
java.net.SocketOutputStream.write(byte[])
io.netty.buffer.UnpooledUnsafeDirectByteBuf.getBytes(int, OutputStream, int)
io.netty.buffer.AbstractByteBuf.readBytes(OutputStream, int)
io.netty.channel.oio.OioByteStreamChannel.doWriteBytes(ByteBuf)
io.netty.channel.oio.AbstractOioByteChannel.doWrite(ChannelOutboundBuffer)
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0()
io.netty.channel.AbstractChannel$AbstractUnsafe.flush()
io.netty.channel.DefaultChannelPipeline$HeadHandler.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.handler.logging.LoggingHandler.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.write(Object, boolean, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.writeAndFlush(Object, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.writeAndFlush(Object)
io.netty.channel.DefaultChannelPipeline.writeAndFlush(Object)
io.netty.channel.AbstractChannel.writeAndFlush(Object)
client.ClientHandler.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.handler.logging.LoggingHandler.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.DefaultChannelPipeline.fireChannelActive()
io.netty.channel.oio.AbstractOioChannel$DefaultOioUnsafe.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelPipeline$HeadHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelDuplexHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.handler.logging.LoggingHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelPipeline.connect(SocketAddress, ChannelPromise)
io.netty.channel.AbstractChannel.connect(SocketAddress, ChannelPromise)
io.netty.bootstrap.Bootstrap$2.run()
io.netty.channel.ThreadPerChannelEventLoop.run()
io.netty.util.concurrent.SingleThreadEventExecutor$2.run()
java.lang.Thread.run()

Code that creates the acceptor channel:

final class ServerChannelFactory {

    private static final Logger LOGGER = Logger.getLogger(ServerChannelFactory.class);

    protected static Channel createAcceptorChannel(
            final ChannelType channelType,
            final InetSocketAddress localAddress,
            final ServerHandler serverHandler
    ) {
        final ServerBootstrap serverBootstrap = ServerBootstrapFactory.createServerBootstrap(channelType);

        serverBootstrap
                .childHandler(new ServerChannelInitializer(serverHandler))
                .option(ChannelOption.SO_BACKLOG, Resources.SO_BACKLOG);

        try {
            ChannelFuture channelFuture = serverBootstrap.bind(
                    new InetSocketAddress(localAddress.getPort())).sync();
            channelFuture.awaitUninterruptibly();
            if (channelFuture.isSuccess()) {
                return channelFuture.channel();

            } else {
                LOGGER.warn(String.format("Failed to open socket! Cannot bind to port: %d!",
                        localAddress.getPort()));
            }
        } catch (InterruptedException e) {
            LOGGER.error("Failed to create acceptor socket.", e);
        }
        return null;
    }

    private static class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

        private ChannelHandler serverHandler;

        private ServerChannelInitializer(ChannelHandler serverHandler) {
            this.serverHandler = serverHandler;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // Encoders
            ch.pipeline().addLast(Resources.STRING_ENCODER_NAME, new StringEncoder(CharsetUtil.UTF_8));
            ch.pipeline().addBefore(Resources.STRING_ENCODER_NAME, Resources.FRAME_ENCODER_NAME,
                    new LengthFieldPrepender(Resources.FRAME_LENGTH_FIELD_SIZE));

            // Decoders
            ch.pipeline().addLast(Resources.STRING_DECODER_NAME, new StringDecoder(CharsetUtil.UTF_8));
            ch.pipeline().addBefore(Resources.STRING_DECODER_NAME, Resources.FRAME_DECODER_NAME,
                    new LengthFieldBasedFrameDecoder(Resources.MAX_FRAME_LENGTH,
                            Resources.FRAME_LENGTH_FIELD_OFFSET, Resources.FRAME_LENGTH_FIELD_SIZE,
                            Resources.FRAME_LENGTH_ADJUSTMENT, Resources.FRAME_LENGTH_FIELD_SIZE));

            // Handlers
            ch.pipeline().addLast(Resources.LOGGING_HANDLER_NAME, new LoggingHandler());
            ch.pipeline().addLast(Resources.SERVER_HANDLER_NAME, serverHandler);
        }
    }
}

Server Handler:

final class ServerHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOGGER = Logger.getLogger(ServerHandler.class);
    int noMessagesReceived = 0;

    @Override
    public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
        for(int i=0; i< Resources.NO_MESSAGES_TO_SEND; i++) {
            ctx.channel().writeAndFlush(MessageStore.getMessage(i));
        }
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
        noMessagesReceived++;
        if(noMessagesReceived == Resources.NO_MESSAGES_TO_SEND) {
            ctx.channel().writeAndFlush(MessageStore.getMessage(0));
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        FileUtils.write(Resources.SERVER_OUTPUT, String.format("Received %d messages", noMessagesReceived));
    }

    @Override
    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
        LOGGER.error(String.format("Exception in %s", this.getClass().getName()), cause);
    }
}

Server Bootstrap Factory:

public class ServerBootstrapFactory {

    private ServerBootstrapFactory() {
    }

    public static ServerBootstrap createServerBootstrap(final ChannelType channelType) throws UnsupportedOperationException {
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        switch (channelType) {
            case NIO:
                serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup());
                serverBootstrap.channel(NioServerSocketChannel.class);
                return serverBootstrap;

            case OIO:
                serverBootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup());
                serverBootstrap.channel(OioServerSocketChannel.class);
                return serverBootstrap;

            default:
                throw new UnsupportedOperationException("Failed to create ServerBootstrap,  " + channelType + " not supported!");
        }
    }
}

Code that creates the connector channel:

final class ClientChannelFactory {

    private static final Logger LOGGER = Logger.getLogger(ClientChannelFactory.class);

    protected static Channel createConnectorChannel(
            ChannelType channelType,
            final ClientHandler clientHandler,
            InetSocketAddress remoteAddress
    ) {
        final Bootstrap bootstrap = BootstrapFactory.createBootstrap(channelType);

        bootstrap.handler(new ClientChannelInitializer(clientHandler));

        try {
            final ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(remoteAddress.getAddress(), remoteAddress.getPort()))
                    .sync();
            channelFuture.awaitUninterruptibly();
            if (channelFuture.isSuccess()) {
                return channelFuture.channel();

            } else {
                LOGGER.warn(String.format(
                        "Failed to open socket! Cannot connect to ip: %s port: %d!",
                        remoteAddress.getAddress(), remoteAddress.getPort())
                );
            }
        } catch (InterruptedException e) {
            LOGGER.error("Failed to open socket!", e);
        }
        return null;
    }

    private static class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

        private ChannelHandler clientHandler;

        private ClientChannelInitializer(ChannelHandler clientHandler) {
            this.clientHandler = clientHandler;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // Encoders
            ch.pipeline().addLast(Resources.STRING_ENCODER_NAME, new StringEncoder(CharsetUtil.UTF_8));
            ch.pipeline().addBefore(Resources.STRING_ENCODER_NAME, Resources.FRAME_ENCODER_NAME,
                    new LengthFieldPrepender(Resources.FRAME_LENGTH_FIELD_SIZE));

            // Decoders
            ch.pipeline().addLast(Resources.STRING_DECODER_NAME, new StringDecoder(CharsetUtil.UTF_8));
            ch.pipeline().addBefore(Resources.STRING_DECODER_NAME, Resources.FRAME_DECODER_NAME,
                    new LengthFieldBasedFrameDecoder(Resources.MAX_FRAME_LENGTH,
                            Resources.FRAME_LENGTH_FIELD_OFFSET, Resources.FRAME_LENGTH_FIELD_SIZE,
                            Resources.FRAME_LENGTH_ADJUSTMENT, Resources.FRAME_LENGTH_FIELD_SIZE));

            // Handlers
            ch.pipeline().addLast(Resources.LOGGING_HANDLER_NAME, new LoggingHandler());
            ch.pipeline().addLast(Resources.CLIENT_HANDLER_NAME, clientHandler);
        }
    }
}

Client Handler:

public final class ClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOGGER = Logger.getLogger(ClientHandler.class);
    private int noMessagesReceived = 0;

    @Override
    public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
        for(int i=0; i< Resources.NO_MESSAGES_TO_SEND; i++) {
            ctx.channel().writeAndFlush(MessageStore.getMessage(i));
        }
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        noMessagesReceived++;
        if (noMessagesReceived > Resources.NO_MESSAGES_TO_SEND) {
            ctx.channel().close();
        }
    }

    @Override
    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
        FileUtils.write(Resources.CLIENT_OUTPUT, String.format("Received %d messages", noMessagesReceived));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error(String.format("Exception in %s", this.getClass().getName()), cause);
    }
}

Bootstrap Factory:

public class BootstrapFactory {

    private BootstrapFactory() {
    }

    public static Bootstrap createBootstrap(final ChannelType channelType) throws UnsupportedOperationException {
        Bootstrap bootstrap = new Bootstrap();

        switch (channelType) {
            case NIO:
                bootstrap.group(new NioEventLoopGroup());
                bootstrap.channel(NioSocketChannel.class);
                return bootstrap;

            case OIO:
                bootstrap.group(new OioEventLoopGroup());
                bootstrap.channel(OioSocketChannel.class);
                return bootstrap;

            default:
                throw new UnsupportedOperationException("Failed to create Bootstrap,  " + channelType + " not supported!");
        }
    }
}

Channel Type:

public enum ChannelType {

    // New IO - non-blocking
    NIO,

    // Old IO - blocking
    OIO;
}
2
I would like to try this code. How do I run the client? - Kishore
You can find a pretty good tutorial about netty here - Illyes Istvan
Thank you. Definitely I'll go through the tutorial. One simple question I have is How can I create a OIO UDT channel ? I couldn't find an appropriate channel for it. I already have a working NIO UDT channel but now I want OIO UDT. - Kishore
Did you mean UDP? You could use OioDatagramChannel. - Illyes Istvan
No there is a protocol called UDT. It is supported by Netty. UDT is built on UDP. - Kishore

2 Answers

4
votes

Because Netty's OIO transport performs read and write in the same thread, it does not read while write is in progress.

The problem is, if both client and server are implemented with the OIO transport, they might end up writing to each other and waiting for each other to read what they are writing.

The workaround is 1) to use NIO for at least one side, or 2) to be extremely careful not to fill the peer's socket receive buffer up to its max size. Practically, (2) isn't very easy to achieve, so it's always recommended to use the NIO transport at least for the server side.

1
votes

write() blocks when the sender is way ahead of the receiver. It's not a good idea to combine blocking and non-blocking I/O like this.