I am trying to write a proxy server using netty v4.0.30. I have gone through the proxy example included in the release (http://netty.io/4.0/xref/io/netty/example/proxy/package-summary.html). However my requirement is is bit different.
In my case there can multiple server behind my netty instance, so I cannot create a client bootstrap directly in ChannelActive method. My client is essentially sending two request (both TCP) to my netty server:-
Request1:- Connect to backend server A at port X. At this point I should be able to open a connection to my backend server and reply back Success as reponse to client
Request2:- Actual data written by client on the same socket on which netty would forward to backend server.
Since there can be many backend servers hence these 2 request. Since I am still trying to learn netty any tips regarding the same would be of great help.
Thanks in Advance.
EDIT:
Here is my handler piece which is able to connect to multiple backend servers as provided in the first request:-
Inbound channel handler
public class TunnelInboundHandler extends ChannelInboundHandlerAdapter {
// objects for client bootstrap and outbound channel
private Bootstrap b = new Bootstrap()
.group(new NioEventLoopGroup(1))
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.AUTO_READ, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,15000)
.option(ChannelOption.SO_SNDBUF, 1048576)
.option(ChannelOption.SO_RCVBUF, 1048576)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
private volatile Channel outboundChannel;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// to differentiate between request to connect and actual data
Attribute<Boolean> connected = ctx.attr(isConnected);
// to store outbound channel object
Attribute<Channel> channelContx = ctx.attr(channelContext);
// first request id of format - CONNECT-<IP>-<PORT>
if(connected.get() == null)
{
ByteBuf in = (ByteBuf) msg;
String connectDest = "";
try {
while (in.isReadable()) {
connectDest = connectDest + (char) in.readByte();
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}
String[] connectDestArr = connectDest.split("-");
b.channel(ctx.channel().getClass());
b.handler(new NettyTargetHandlerInitilizer(ctx.channel()));
ChannelFuture f = b.connect(connectDestArr[1].trim(), Integer.parseInt(connectDestArr[2].trim()));
outboundChannel = f.channel();
channelContx.set(outboundChannel);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// connection complete start to read first data
ctx.channel().read();
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().close();
}
}
});
// response Success to client so that Actual request is sent
String response = "SUCCESS\n";
ByteBuf res = ctx.alloc().buffer(response.length());
res.writeBytes(response.getBytes());
ctx.write(res);
ctx.flush();
// set connected as true to identify first request completion
connected.set(true);
}else if(connected.get()){
if (channelContx.get().isActive()) {
channelContx.get().writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.cause().printStackTrace();
future.channel().close();
}
}
});
} else {
// System.out.println("Outbound Channel Not Active");
}
}
}
}
OutBound Channel Handler
public OutBoundTargetHandler(Channel inboundChannel) {
// System.out.println("Initlizing target pool");
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("Activating Chanel");
ctx.read();
ctx.write(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println("Receving data");
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (inboundChannel.isActive()) {
inboundChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
All is working as expected, the only issue being my client bootstrap is not getting closed after the request is finished. Thus for each request my thread count increased by one. Any tips on the same?