I built a websocket client based on netty websocket client example to integrate with 3rd party feed, the code is almost same as the official example, only changed the URI and the TextWebSocketFrame handling.
This client is part of a Spring Boot webapp, and it works fine on my MacBook or Windows 7 PC, however, once I deploy the war to Linux server (Linux version 2.6.32-504.3.3.el6.x86_64 Red Hat 4.4.7-11), the channel would go inactive directly when the application starts.
I have checked the network, JDK version(1.8.x), Tomcat version (8.5.16), Netty version(4.1.13.Final), seems no problem, really confused.
WebSocketClientHandler.java:
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private final Logger logger = LoggerFactory.getLogger("feedlogger");
private final WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// always reaches here immediately on Linux
logger.error("webSocket Client disconnected!");
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
logger.info("finishHandshake and webSocket Client Connected!");
handshakeFuture.setSuccess();
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
logger.error("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content="
+ response.content().toString(CharsetUtil.UTF_8) + ')');
return;
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
logger.info(textFrame.text());
List<String> messages = FeedJsonUtils.deserializeAsyncMessage(textFrame.text());
if (!CollectionUtils.isEmpty(messages)) {
messages.stream().forEach(message -> {
FeedReceiver.onMessage(message);
});
}
} else if (frame instanceof PongWebSocketFrame) {
logger.error("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
logger.error("WebSocket Client received closing");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
logger.error("exception caught: ", cause);
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}
WebSocketClientImpl.java:
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
@Component
public class WebsocketClientImpl {
private final Logger logger = LoggerFactory.getLogger("feedlogger");
private static final String WSS_URL = "wss://api.xxx.com/ws/?token=";
private Channel ch = null;
@Async
public void connectWebSocket(String token) throws Exception {
URI uri = new URI(WSS_URL + token);
String scheme = uri.getScheme() == null ? "wss" : uri.getScheme();
final String host = uri.getHost() == null ? "api.mollybet.com" : uri.getHost();
final int port;
if (uri.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = uri.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
logger.error("Only WS(S) is supported.");
return;
}
// SSL handling
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08
// or V00.
// If you change it to V00, ping is not supported and remember to
// change
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the
// pipeline.
final WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory
.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE, handler);
}
});
ch = b.connect(uri.getHost(), port).sync().channel();
handler.handshakeFuture().sync();
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = console.readLine();
if (msg == null) {
break;
} else if ("bye".equals(msg.toLowerCase())) {
ch.writeAndFlush(new CloseWebSocketFrame());
ch.closeFuture().sync();
break;
} else if ("ping".equals(msg.toLowerCase())) {
WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] { 8, 1, 8, 1 }));
ch.writeAndFlush(frame);
} else {
WebSocketFrame frame = new TextWebSocketFrame(msg);
ch.writeAndFlush(frame);
}
}
} finally {
group.shutdownGracefully();
}
}
public void send(String message) {
if (ch == null) {
logger.error("channel unavailable");
return;
}
WebSocketFrame frame = new TextWebSocketFrame(message);
ch.writeAndFlush(frame);
}
}
FeedServiceImpl.java: (connect websocket automatically when application starts)
@Service
@PropertySource("classpath:api.properties")
public class FeedServiceImpl implements FeedService {
private final static Logger logger = LoggerFactory.getLogger("feedlogger");
@Autowired
private Environment env;
@Autowired
private WebsocketClientImpl websocketClientImpl;
@Override
@PostConstruct
public void connectWs() {
String token = env.getProperty("ws.token", String.class);
websocketClientImpl.connectWebSocket(token);
}
}
}