0
votes

I built a websocket client based on netty websocket client example to integrate with 3rd party feed, the code is almost same as the official example, only changed the URI and the TextWebSocketFrame handling.

This client is part of a Spring Boot webapp, and it works fine on my MacBook or Windows 7 PC, however, once I deploy the war to Linux server (Linux version 2.6.32-504.3.3.el6.x86_64 Red Hat 4.4.7-11), the channel would go inactive directly when the application starts.

I have checked the network, JDK version(1.8.x), Tomcat version (8.5.16), Netty version(4.1.13.Final), seems no problem, really confused.

WebSocketClientHandler.java:

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    private final Logger logger = LoggerFactory.getLogger("feedlogger");

    private final WebSocketClientHandshaker handshaker;
    private ChannelPromise handshakeFuture;

    public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        handshakeFuture = ctx.newPromise();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        // always reaches here immediately on Linux
        logger.error("webSocket Client disconnected!");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (!handshaker.isHandshakeComplete()) {
            handshaker.finishHandshake(ch, (FullHttpResponse) msg);
            logger.info("finishHandshake and webSocket Client Connected!");
            handshakeFuture.setSuccess();
            return;
        }

        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            logger.error("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
                + response.content().toString(CharsetUtil.UTF_8) + ')');
            return;
        }

        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            logger.info(textFrame.text());
            List<String> messages = FeedJsonUtils.deserializeAsyncMessage(textFrame.text());
            if (!CollectionUtils.isEmpty(messages)) {
                messages.stream().forEach(message -> {
                        FeedReceiver.onMessage(message);
                });
            }
        } else if (frame instanceof PongWebSocketFrame) {
            logger.error("WebSocket Client received pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            logger.error("WebSocket Client received closing");
            ch.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
    {
        logger.error("exception caught: ", cause);
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}

WebSocketClientImpl.java:

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;


@Component
public class WebsocketClientImpl {
    private final Logger logger = LoggerFactory.getLogger("feedlogger");

    private static final String WSS_URL = "wss://api.xxx.com/ws/?token=";

    private Channel ch = null;

    @Async
    public void connectWebSocket(String token) throws Exception {

        URI uri = new URI(WSS_URL + token);
        String scheme = uri.getScheme() == null ? "wss" : uri.getScheme();
        final String host = uri.getHost() == null ? "api.mollybet.com" : uri.getHost();

        final int port;
        if (uri.getPort() == -1) {
            if ("ws".equalsIgnoreCase(scheme)) {
                port = 80;
            } else if ("wss".equalsIgnoreCase(scheme)) {
                port = 443;
            } else {
                port = -1;
            }
        } else {
            port = uri.getPort();
        }

        if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
            logger.error("Only WS(S) is supported.");
            return;
        }

        // SSL handling
        final boolean ssl = "wss".equalsIgnoreCase(scheme);
        final SslContext sslCtx;
        if (ssl) {
            sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup group = new NioEventLoopGroup();
        try {
        // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08
        // or V00.
        // If you change it to V00, ping is not supported and remember to
        // change
        // HttpResponseDecoder to WebSocketHttpResponseDecoder in the
        // pipeline.
        final WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
                .newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));

        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                ChannelPipeline p = ch.pipeline();
                if (sslCtx != null) {
                    p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                }
                p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192),
                        WebSocketClientCompressionHandler.INSTANCE, handler);
                }
            });

            ch = b.connect(uri.getHost(), port).sync().channel();
            handler.handshakeFuture().sync();

            BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String msg = console.readLine();
                if (msg == null) {
                    break;
                } else if ("bye".equals(msg.toLowerCase())) {
                    ch.writeAndFlush(new CloseWebSocketFrame());
                    ch.closeFuture().sync();
                    break;
                } else if ("ping".equals(msg.toLowerCase())) {
                    WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
                ch.writeAndFlush(frame);
                } else {
                    WebSocketFrame frame = new TextWebSocketFrame(msg);
                    ch.writeAndFlush(frame);
                }
            }
        } finally {
            group.shutdownGracefully();
        }
    }

    public void send(String message) {
        if (ch == null) {
            logger.error("channel unavailable");
            return;
        }
        WebSocketFrame frame = new TextWebSocketFrame(message);
        ch.writeAndFlush(frame);
    }


}

FeedServiceImpl.java: (connect websocket automatically when application starts)

@Service
@PropertySource("classpath:api.properties")
public class FeedServiceImpl implements FeedService {
    private final static Logger logger = LoggerFactory.getLogger("feedlogger");

    @Autowired
    private Environment env;

    @Autowired
    private WebsocketClientImpl websocketClientImpl;

    @Override
    @PostConstruct
    public void connectWs() {
        String token = env.getProperty("ws.token", String.class);
        websocketClientImpl.connectWebSocket(token);
    }

}

}

1

1 Answers

0
votes

Finally I solved this problem, my WebSocketClient class is almost same as the WebSocketClient in the netty websocket client example, after replacing the following code:

BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
    String msg = console.readLine();
    if (msg == null) {
        break;
    } else if ("bye".equals(msg.toLowerCase())) {
        ch.writeAndFlush(new CloseWebSocketFrame());
        ch.closeFuture().sync();
        break;
    } else if ("ping".equals(msg.toLowerCase())) {
        WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
        ch.writeAndFlush(frame);
    } else {
        WebSocketFrame frame = new TextWebSocketFrame(msg);
        ch.writeAndFlush(frame);
    }
}

with:

Thread.sleep(Integer.MAX_VALUE);

the websocket connection back to normal now.