2
votes

I'm setting up a client connection using OioClientSocketChannelFactory and a pipeline like this:

ClientBootstrap bootstrap = new ClientBootstrap(
    new OioClientSocketChannelFactory(Executors.newCachedThreadPool())
);

pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("manager", new FooClientManager());
BlockingReadHandler<Message> reader = new BlockingReadHandler<Message>();
pipeline.addLast("reader", reader);
bootstrap.setPipeline(pipeline);

The FooClientManager class (a SimpleChannelHandler) sends a handshake message on connect (SimpleChannelHandler.channelConnected), and has the responsibility of consuming the handshake reply (SimpleChannelHandler.messageReceived) from the server without passing it up the pipeline. I don't want the user of the API receiving certain low level messages, which is FooClientManager's responsibility to consume.

It is expected that after connecting, the user of the API would now construct a message and call channel.write() to send the message, and reader.read() to block awaiting a reply.

The problem I'm having is FooClientManager did not see the handshake reply before the user of the API called channel.write(), because a read() was never performed.

If I call reader.read() before calling channel.write(), it blocks indefinitely, because FooClientManager doesn't send the message I'm trying to read up the pipeline.

What is the best way to deal with blocking client IO and a channel handler in the pipeline that may consume messages?

2

2 Answers

1
votes

FooClientManager needs to signal the API user when the handshake has completed. One option is to have the API user implement an interface which is passed into FooClientManager's constructor. You can always create a default implementation of this interface that acts like a future and allows the API user to block until the handshake is complete.

Another option is to have FooClientManager send a custom message up the pipeline that signals the connection is ready. The API user then blocks on reader.read() until it receives the message.

0
votes

Thanks for the answer johnstlr!

Both of those solutions work. The solution we implemented was to have FooClientManager queue write requests until the handshake is complete. Then it sends all the queued messages.

One problem we ran into is that if we're queuing all write requests until a handshake is complete, then the handshake message itself is queued when FooClientManaager calls channel.write() and the message is sent through the whole pipeline. We got around this by having FooClientManager construct a message event and send it directly to the next handler in the pipeline:

ctx.sendDownstream(new DownstreamMessageEvent(ctx.getChannel(), new DefaultChannelFuture(ctx.getChannel(), false), msg, e.getChannel().getRemoteAddress()));