4
votes

I am new to Netty and trying to writing a server client project using it. I am able to send the request from client to the server successfully and am able to process it too using my listeners. But the issue am having is when I try to write the response back to the channel for the client processing on the server side using channel.write(), I am unable to get it back at client.

This is my first time on stackoverflow, please forgive if I make some mistake while asking the question or for indentation issues while posting the code.

This is my server :

public class SocketIOServer {

private Logger log = Logger.getLogger(getClass());
private ServerBootstrap bootstrap;
private Channel serverChannel;
private int port;
private boolean running;

public SocketIOServer(int port) {
    this.port = port;
    this.running = false;
}

public boolean isRunning() {
    return this.running;
}

public void start() {
    bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool()));
    try {
        EncryptionManager encryptionManager = EncryptionManager.getInstance();
    } catch (Exception ex) {
        java.util.logging.Logger.getLogger(SocketIOServer.class.getName()).log(Level.SEVERE, null, ex);
    }
    // Set up the event pipeline factory.
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = pipeline();

            pipeline.addLast("decode", new FrameDecoder(){

                @Override
                protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
                    // Make sure if the length field was received.
                    if (buf.readableBytes() < 4) {
                        // The length field was not received yet - return null.
                        // This method will be invoked again when more packets are
                        // received and appended to the buffer.
                        return null;
                    }

                    // The length field is in the buffer.

                    // Mark the current buffer position before reading the length field
                    // because the whole frame might not be in the buffer yet.
                    // We will reset the buffer position to the marked position if
                    // there's not enough bytes in the buffer.
                    buf.markReaderIndex();

                    // Read the length field.
                    int length = buf.readInt();

                    // Make sure if there's enough bytes in the buffer.
                    if (buf.readableBytes() < length) {
                        // The whole bytes were not received yet - return null.
                        // This method will be invoked again when more packets are
                        // received and appended to the buffer.

                        // Reset to the marked position to read the length field again
                        // next time.
                        buf.resetReaderIndex();

                        return null;
                    }

                    // There's enough bytes in the buffer. Read it.
                    ChannelBuffer frame = buf.readBytes(length);

                    // Successfully decoded a frame.  Return the decoded frame.
                    return frame;
                }
            });
            pipeline.addLast("handler", new GameServerHandler());
            return pipeline;
        }
    });


    bootstrap.setOption("backlog", 1024);


    // Bind and start to accept incoming connections.
    this.serverChannel = bootstrap.bind(new InetSocketAddress(port));
    this.running = true;

    log.info("Server Started at port [" + port + "]");
    System.out.println("Server Started at port [" + port + "]");
}



public static void main(String[] args) {
    SocketIOServer server = new SocketIOServer(8888);
    server.start();
}

Server Handler class :

public class GameServerHandler extends SimpleChannelUpstreamHandler {


    @Override
      public void handleUpstream(
              ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
          if (e instanceof ChannelStateEvent) {
              logger.info(e.toString());
          }
          super.handleUpstream(ctx, e);
      }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
        System.out.println("Following data recieved at the server");
        ConnectionManager connectionManager = ConnectionManager.getInstance();
        List<Object> paramsList = new ArrayList<Object>();
        NetworkClient client =  connectionManager.getNetworkClient(e.getChannel().getId());
        ChannelBuffer ebuf = (ChannelBuffer)e.getMessage();
        if( client == null ) {
            client = new NetworkClient(e.getChannel());
            connectionManager.addNetworkClient(e.getChannel().getId(), client);
        } 

        byte [] encryptedData = ebuf.array();
        System.out.println("encrypted data size : "+ encryptedData.length);
        System.out.println("encrypted data : "+ encryptedData);
        byte [] decrpytedData = null;
        try {
            decrpytedData = EncryptionManager.getInstance().decrypt(encryptedData, EncryptionManager.getInstance().getPrivateKey());
        }catch (Throwable ee){
            ee.printStackTrace();
        }
        ChannelBuffer buf = ChannelBuffers.buffer(decrpytedData.length);
        buf.writeBytes(decrpytedData);

        while(buf.readable()) {
            long gameTableId = buf.readLong();

            GameTable gameTable = gameTableController.getTablePeer(gameTableId);
            if(gameTable == null) {

                GameTable newGameTable = new GameTable();
                newGameTable.setTableId(gameTableId);
                newGameTable.registerListeners();
                gameTableController.storeTablePeer(gameTableId, newGameTable);              
            }

            int eventHash = buf.readInt();
            String eventName = getEventNameFromEventHash(eventHash);
            int paramCount = buf.readInt();
            if(paramCount > 0) {
                for(int count=0;count<paramCount;count++) {
                    populateParamList(buf, paramsList);
                }


                if(!NetworkMessenger.broadcastToAllFromNetwork(eventName, client, paramsList.toArray(new Object[paramsList.size()]))) {
                    logger.debug( "Unhandled Data:" + eventName);
                    System.out.println("Unhandled Data:" + eventName);
                }
                logger.debug( "Data processed successfully for " + eventName + " game table id : " + gameTableId);
                System.out.println("Data processed successfully for " + eventName + " game table id : " + gameTableId);
            }

            break;
        }
    }




    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        logger.log(
                Level.WARN,
                "Unexpected exception from downstream.",
                e.getCause());
        Channel ch = e.getChannel();
        ch.close();
    }


    @Override
      public void channelConnected(
              ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {


        ChannelFuture cf = e.getFuture();
        cf.addListener(new Greeter());
      }

    @Override
      public void channelDisconnected(
              ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
          // Unregister the channel from the global channel list
          // so the channel does not receive messages anymore.
          channels.remove(e.getChannel());
      }

    private static final class Greeter implements ChannelFutureListener {

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                // Once session is secured, send a greeting.

                String welcomeMsg = "Welcome";      
                ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
                cb.writeBytes(welcomeMsg.getBytes());

                future.getChannel().write(cb);

                // Register the channel to the global channel list
                // so the channel received the messages from others.
                channels.add(future.getChannel());
            } else {
                future.getChannel().close();
            }
        }
    }
}

This is my client class :

public class Clientbot {


    public static void main(String[] args) throws IOException {
        String host = "localhost";
        int port = 8888;
        final ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
        // Configure the client.
        ChannelFactory factory =
                new NioClientSocketChannelFactory(
                Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool());

        ClientBootstrap bootstrap = new ClientBootstrap(factory);

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {

                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decode", new FrameDecoder(){

                    @Override
                    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
                        // Make sure if the length field was received.
                        if (buf.readableBytes() < 4) {
                            return null;
                        }

                         buf.markReaderIndex();

                        // Read the length field.
                        int length = buf.readInt();

                        // Make sure if there's enough bytes in the buffer.
                        if (buf.readableBytes() < length) {
                            buf.resetReaderIndex();

                            return null;
                        }

                        // There's enough bytes in the buffer. Read it.
                        ChannelBuffer frame = buf.readBytes(length);

                        // Successfully decoded a frame.  Return the decoded frame.
                        return frame;
                    }
                });
                pipeline.addLast("handler", new ClientHandler());

                return pipeline;
            }
        });

        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);

        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
        // Wait until the connection attempt succeeds or fails.
        Channel channel = future.awaitUninterruptibly().getChannel();
        if (!future.isSuccess()) {
            future.getCause().printStackTrace();
            bootstrap.releaseExternalResources();
            return;
        }


         ChannelFuture lastWriteFuture = null;
            try {
                lastWriteFuture = writeSecureSampleData(channel, buf);
            } catch (Exception ex) {
                ex.printStackTrace();
            }


        // Wait until all messages are flushed before closing the channel.
        if (lastWriteFuture != null) {
            lastWriteFuture.awaitUninterruptibly();
        }

        // Close the connection.  Make sure the close operation ends because
        // all I/O operations are asynchronous in Netty.
        channel.close().awaitUninterruptibly();

        // Shut down all thread pools to exit.
        bootstrap.releaseExternalResources();


    }


    private static ChannelFuture writeSecureSampleData(Channel channel, ChannelBuffer buffer) throws Exception {
        long gameId = 1234;
        ChannelBuffer buf = ChannelBuffers.buffer(256);
        buf.writeLong(gameId);

        writeParamsForLogin(buf);



        byte [] data = buf.array();
        byte [] encryptedData = EncryptionManager.getInstance().encrypt(data, EncryptionManager.getInstance().getPublicKey());
        int size = encryptedData.length;
        buffer.writeInt(size);
        buffer.writeBytes(encryptedData);
        ChannelFuture writeFuture = channel.write(buffer);
        return writeFuture;
    }


    private static void writeParamsForLogin(ChannelBuffer buf) {
        int eventHash = getEventHash("Login");
        buf.writeInt(eventHash);

        int paramCount = 3 ; // in case of PlayerToken 2 parameters to be send : player   id + player token
        buf.writeInt(paramCount);
        // version , E , N 

        String version = "1.0";

        buf.writeInt(dataType_String);
        buf.writeInt(version.length());
        buf.writeBytes(version.getBytes());

        String E = "61"; 
        buf.writeInt(dataType_ByteArray);
        buf.writeInt(E.length());
        buf.writeBytes(E.getBytes());

        String N = "53"; 
        buf.writeInt(dataType_ByteArray);
        buf.writeInt(N.length());
        buf.writeBytes(N.getBytes());
    }
}

and the client handler class :

public class ClientHandler extends SimpleChannelUpstreamHandler {

@Override
public void messageReceived(
        ChannelHandlerContext ctx, MessageEvent e) {
    System.err.println(e.getMessage());
    System.out.println("Message Recieved");
    ChannelBuffer buf = (ChannelBuffer)e.getMessage();
    while (buf.readable()) {
        System.out.println((char) buf.readByte());
        System.out.flush();
    }
}

@Override
public void exceptionCaught(
        ChannelHandlerContext ctx, ExceptionEvent e) {
    logger.log(
            Level.WARNING,
            "Unexpected exception from downstream.",
            e.getCause());
    e.getCause().printStackTrace();
    e.getChannel().close();
}

}

On receiving the request data on the server, I am processing it and writing back the response using the following method :

private static boolean sendMessage(NetworkClient targetObject, String eventName, Object[] params) {
        logger.debug("Sending message for the event : " + eventName);
        if(targetObject == null) {
            sendMessageToAll(eventName, params);
        } else {
            if (targetObject.getClientChannel() == null) {
                logger.error("Target not defined.");
                return false;
            }
            Channel clientChannel = targetObject.getClientChannel();
            ChannelBuffer buf = ChannelBuffers.dynamicBuffer();

            long hash = getHashString(eventName);
            buf.writeInt(512);
            buf.writeLong(hash);
            if(params != null) {
                buf.writeInt(params.length);
                for(Object param : params) {
                     int type = getTypeOfObject(param);
                     buf.writeInt(type);
                     writeParamToBuffer(buf, type, param);
                }

            }
            ChannelFuture cf = null;
             try {

                cf = clientChannel.write(buf);
                if(cf.isSuccess()){
                    System.out.println("Written to client successfully");
                }

            } catch (Exception e) {
                logger.error("Error in broadcasting for event : " + eventName, e);
            } finally {

            }

        }

        return false;
    }

This code is still a work in progress, so you might find lot of "not required" stuff in there. I just wanted to show the logic that I am trying to use and want to know why it is not working.

I have taken help from examples at http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/ for writing this.

Thanks in advance for any help.

1
When you say you're unable to receive it at the client, do you know if the message is actually getting written to the network? Also, the way you check cf.isSuccess in sendMessage is also flawed. Depending on which thread you call clientChannel.write from, and how busy the network is, isSuccess may return false and then the buffer is written. If you're not on the I/O thread you could wait for the future to complete, otherwise you need to set a ChannelFutureListener.johnstlr
were you able to figure this out?ali haider

1 Answers

2
votes

Very complete code.

An easy way to write text to a websockets channel is to do this:

ChannelFuture writeFuture = channel.write(new TextWebSocketFrame("My Text here!");

You don't need to deal with the ChannelBuffers directly. That may resolve your problem.