1
votes

I am trying to write a proxy server using netty v4.0.30. I have gone through the proxy example included in the release (http://netty.io/4.0/xref/io/netty/example/proxy/package-summary.html). However my requirement is is bit different.

In my case there can multiple server behind my netty instance, so I cannot create a client bootstrap directly in ChannelActive method. My client is essentially sending two request (both TCP) to my netty server:-

Request1:- Connect to backend server A at port X. At this point I should be able to open a connection to my backend server and reply back Success as reponse to client

Request2:- Actual data written by client on the same socket on which netty would forward to backend server.

Since there can be many backend servers hence these 2 request. Since I am still trying to learn netty any tips regarding the same would be of great help.

Thanks in Advance.

EDIT:

Here is my handler piece which is able to connect to multiple backend servers as provided in the first request:-

Inbound channel handler

public class TunnelInboundHandler  extends ChannelInboundHandlerAdapter {
// objects for client bootstrap and outbound channel
    private Bootstrap b = new Bootstrap()
            .group(new NioEventLoopGroup(1))
            .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.AUTO_READ, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,15000)
            .option(ChannelOption.SO_SNDBUF, 1048576)
            .option(ChannelOption.SO_RCVBUF, 1048576)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

private volatile Channel outboundChannel;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.read();
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    // to differentiate between request to connect and actual data
    Attribute<Boolean> connected = ctx.attr(isConnected);

    // to store outbound channel object
    Attribute<Channel> channelContx = ctx.attr(channelContext);
    // first request id of format - CONNECT-<IP>-<PORT>
    if(connected.get() == null)
    {
        ByteBuf in = (ByteBuf) msg;
        String  connectDest = "";
        try {
            while (in.isReadable()) { 
                connectDest = connectDest + (char) in.readByte();
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg); 
        }
        String[] connectDestArr = connectDest.split("-");
        b.channel(ctx.channel().getClass());
        b.handler(new NettyTargetHandlerInitilizer(ctx.channel()));

        ChannelFuture f = b.connect(connectDestArr[1].trim(), Integer.parseInt(connectDestArr[2].trim()));

        outboundChannel = f.channel();
        channelContx.set(outboundChannel);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    ctx.channel().read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    ctx.channel().close();
                }
            }
        });

    // response Success to client so that Actual request is sent    
        String response = "SUCCESS\n";
        ByteBuf res = ctx.alloc().buffer(response.length());
        res.writeBytes(response.getBytes());
        ctx.write(res);
        ctx.flush();
    // set connected as true to identify first request completion
        connected.set(true);
    }else if(connected.get()){
        if (channelContx.get().isActive()) {
            channelContx.get().writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.cause().printStackTrace();
                        future.channel().close();
                    }
                }
            });
        } else {
            // System.out.println("Outbound Channel Not Active");
        }
    }

}
}

OutBound Channel Handler

public OutBoundTargetHandler(Channel inboundChannel) {
    //   System.out.println("Initlizing target pool");
    this.inboundChannel = inboundChannel;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //  System.out.println("Activating Chanel");
    ctx.read();
    ctx.write(Unpooled.EMPTY_BUFFER);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    //   System.out.println("Receving data");
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                ctx.channel().read();
            } else {
                future.channel().close();
            }
        }
    });
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    if (inboundChannel.isActive()) {
        inboundChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    if (ctx.channel().isActive()) {
        ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
}

All is working as expected, the only issue being my client bootstrap is not getting closed after the request is finished. Thus for each request my thread count increased by one. Any tips on the same?

1
See What does it mean if a question is "closed" or "on hold"? There are either too many possible answers, or good answers would be too long for this format. Please add details to narrow the answer set or to isolate an issue that can be answered in a few paragraphs. - durron597

1 Answers

0
votes

I believe you need to create a ClientBootstrap globally, then use it within read method of your server handler, such that it creates a connection to the remote choosen server.

The ClientBootstrap donesn't need a specific remote server at definition, but a connect time.

You might need, if necessary, to add/remove some handlers in the channelHandler created for the server used. To do that, once the client channel is Active, you can do your necessary stuf as needed, for instance basing your choice according to the remote address.

Then you can safely continue and proxyfied your packets.

EDIT:

You can attach information to channel or context using attr(key).set(primaryContext or primaryChannel) such that you can forward information. So something like:

static final AttributeKey<Channel> PRIMARY_CHANNEL =
   AttributeKey.valueOf(YourHandler.class.getName() + ".PRIMARY_CHANNEL");

ChannelFuture future = bootstrap.connect(destination);
future.addListener(new ChannelFutureListener() {
     public void operationComplete(ChannelFuture future) {
         // Perform post-connection operation
         if (future.isSuccess()) {
            future.channel.attr(PRIMARY_CHANNEL).set(primaryChannel);
            // primaryChannel being the first channel in the proxy chain
         } else {
            // inform error on connection back to requester
            primaryCtx.writeAndFluxh(error);
         }
     }
 });

Then do in the second channel handler what you need to do with this primaryChannel...