1
votes

I want to read a message at a specific position in an class other than InboundHandler. I can't find a way to read it expect in the channelRead0 method, which is called from the netty framework.

For example:

context.writeMessage("message");
String msg = context.readMessage;

If this is not possible, how can I map a result, which I get in the channelRead0 method to a specific call I made in another class?

1

1 Answers

2
votes

The Netty framework is designed to be asynchronously driven. Using this analogy, it can handle large amount of connections with minimal threading usage. I you are creating an api that uses the netty framework to dispatch calls to a remote location, you should use the same analogy for your calls.

Instead of making your api return the value direct, make it return a Future<?> or a Promise<?>. There are different ways of implementing this system in your application, the simplest way is creating a custom handler that maps the incoming requests to the Promises in a FIFO queue.

An example of this could be the following:

This is heavily based on this answer that I submitted in the past.

We start with out handler that maps the requests to requests in our pipeline:

public class MyLastHandler extends SimpleInboundHandler<String> {
    private final SynchronousQueue<Promise<String>> queue;

    public MyLastHandler (SynchronousQueue<Promise<String>> queue) {
        super();
        this.queue = queue;
    }

    // The following is called messageReceived(ChannelHandlerContext, String) in 5.0.
    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) {
        this.queue.remove().setSuccss(msg); 
        // Or setFailure(Throwable)
    }
}

We then need to have a method of sending the commands to a remote server:

Channel channel = ....;
SynchronousQueue<Promise<String>> queue = ....;

public Future<String> sendCommandAsync(String command) {
    return sendCommandAsync(command, new DefaultPromise<>());
}

public Future<String> sendCommandAsync(String command, Promise<String> promise) {
    synchronized(channel) {
        queue.offer(promise);
        channel.write(command);
    }
    channel.flush();
}

After we have done our methods, we need a way to call it:

sendCommandAsync("USER anonymous", 
    new DefaultPromise<>().addListener(
        (Future<String> f) -> {
            String response = f.get();
            if (response.startWidth("331")) {
                // do something
            }
            // etc
        }
    )
);

If the called would like to use our a api as a blocking call, he can also do that:

String response = sendCommandAsync("USER anonymous").get();
if (response.startWidth("331")) {
    // do something
}
// etc

Notice that Future.get() can throw an InterruptedException if the Thread state is interrupted, unlike a socket read operation, who can only be cancelled by some interaction on the socket. This exception should not be a problem in the FutureListener.