This is my first question on StackOverflow and I hope I have adhered to the expected standards.
I have been taking over some code from someone else who isn't working here anymore and I'm pretty much stranded here. I searched and asked some colleagues (not too much Java experience unfortunately) but no-one seems to be able to help me. Searching didn't really help me either.
I'm sending Json requests to a Netty server from a client which intentionally is NOT implemented using Netty. For now it is just a simple Java socket, but the intention is to have a Flask client send requests to the Netty server. The requests arrive (both using Java Sockets and using Python Flask), and get properly processed in the pipeline, but I want to send a response to the client and although I suspect where in the code to send the response I'm clearly missing out on something as I don't get any response. Any suggestions?
The Java Socket client (note that the json1 and json2 strings have been omitted from the snippet here as they are rather long, but they are formatted properly). Posting requests using a Socket and the related output stream. The response part (with the input stream for the same socket) is just some test which I have my doubt about, but not sure how to do this otherwise (and that's why I kept it here). I've been seeing plenty of examples with clients implementing Netty interfaces and that seems to work fine, but as said I want a client not using Netty to be able to receive the responses as well (if that's possible at all).
String serverResponse;
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 1000; i++) {
try {
Socket s = new Socket("localhost", 12000);
PrintWriter out = new PrintWriter(s.getOutputStream(), true);
out.write(json1 + i + json2);
out.flush();
// Testing only - trying to get the response back from the server
BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream()));
while(true) {
if ((serverResponse = in.readLine()) != null) {
log.info("server says", serverResponse);
break;
}
}
out.close();
s.close();
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
Thread.sleep(2000);
}
MCTcpServer.java
/**
* Abstract TCP Server class. this class should be implemented in the subclass to implement an actual server.
*
* @param <R> The data to be read from the socket.
* @param <W> data to be written (in case of duplex) from the socket.
*/
public abstract class MFTcpServer<R, W> {
protected final AtomicBoolean started;
protected MFTcpServer() {
this.started = new AtomicBoolean();
}
/**
* Start the server.
*
* @param initializer the channel initializers. they will be called when a new client connects to the server.
* @return instance of tcp server
*/
public final MFTcpServer<R, W> start(ChannelInitializer<Channel> initializer) {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("Server already started");
}
doStart(initializer);
return this;
}
/**
* Start the server and wait for all the threads to be finished before shutdown.
* @param initializer the channel initializers. they will be called when a new client connects to the server.
*/
public final void startAndAwait(ChannelInitializer<Channel> initializer) {
start(initializer);
awaitShutdown();
}
/**
* Shutdown the server
* @return true if successfully shutdown.
*/
public final boolean shutdown() {
return !started.compareAndSet(true, false) || doShutdown();
}
/**
* Wait for all the threads to be finished before shutdown.
*/
public abstract void awaitShutdown();
/**
* Do the shutdown now.
* @return true if successfully shutdown
*/
public abstract boolean doShutdown();
/**
* start the server
* @param initializer the channel initializers. they will be called when a new client connetcs to the server.
* @return instance of tcp server
*/
public abstract MFTcpServer<R, W> doStart(ChannelInitializer<Channel> initializer);
/**
*
* @return the port where the server is running.
*/
public abstract int getPort();
MFNetty4TcpServer.java Actual server implementation
public class MFNetty4TcpServer<R, W> extends MFTcpServer<R, W> {
private static final Logger logger = LoggerFactory.getLogger(MFNetty4TcpServer.class);
private static final int BOSS_THREAD_POOL_SIZE = 2;
private int port;
private ServerBootstrap bootstrap;
private ChannelFuture bindFuture;
/**
* The constructor.
*
* @param port port where to listen
*/
protected MFNetty4TcpServer(int port) {
this.port = port;
final NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
(BOSS_THREAD_POOL_SIZE));
final NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
(JsonProducerConfig.THREAD_POOL_SIZE));
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class);
}
@Override
public MFNetty4TcpServer<R, W> doStart(ChannelInitializer<Channel> initializer) {
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
if (initializer != null) {
ch.pipeline().addLast(initializer);
}
}
});
try {
bindFuture = bootstrap.bind(port).sync();
if (!bindFuture.isSuccess()) {
// Connection not successful
throw new RuntimeException(bindFuture.cause());
}
SocketAddress localAddress = bindFuture.channel().localAddress();
if (localAddress instanceof InetSocketAddress) {
port = ((InetSocketAddress) localAddress).getPort();
logger.info("Started server at port: " + port);
}
} catch (InterruptedException e) {
logger.error("Error waiting for binding server port: " + port, e);
}
return this;
}
@Override
public void awaitShutdown() {
try {
bindFuture.channel().closeFuture().await();
} catch (InterruptedException e) {
Thread.interrupted(); // Reset the interrupted status
logger.error("Interrupted while waiting for the server socket to close.", e);
}
}
@Override
public boolean doShutdown() {
try {
bindFuture.channel().close().sync();
return true;
} catch (InterruptedException e) {
logger.error("Failed to shutdown the server.", e);
return false;
}
}
@Override
public int getPort() {
return port;
}
/**
* Creates a tcp server at the defined port.
*
* @param port port to listen to
* @param <R> data to be read
* @param <W> data to be written back. Only in case of duplex connection.
* @return instance of tcp server.
*/
public static <R, W> MFTcpServer<R, W> create(int port) {
return new MFNetty4TcpServer<>(port);
}
}
JsonProducerConfig.java The pipeline is setup here.
/**
* Spring Configuration class of the application.
*/
@Configuration
@Import({DatabusConfig.class})
public class JsonProducerConfig {
private static final Logger log = LoggerFactory.getLogger(JsonProducerConfig.class);
public static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
public static final String TCP_SERVER = "tcpServer";
public static final String CHANNEL_PIPELINE_INITIALIZER = "channel_initializer";
public static final String MF_KAFKA_PRODUCER = "mf_kafka_producer";
public static final String JSON_AVRO_CONVERTOR = "jsonAvroConvertor";
@Value("#{systemProperties['tcpserver.port']?:'12000'}")
private String tcpServerPort;
@Bean(name = TCP_SERVER)
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public MFTcpServer nettyTCPServer() {
return MFNetty4TcpServer.create(Integer.parseInt(tcpServerPort));
}
@Bean(name = MF_KAFKA_PRODUCER)
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public MFKafkaProducer pushToKafka() {
return new MFKafkaProducer();
}
@Bean(name = JSON_AVRO_CONVERTOR)
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public JsonAvroConvertor jsonAvroConvertor() {
return new JsonAvroConvertor();
}
/**
* This is where the pipeline is set for processing of events.
*
* @param jsonAvroConvertor converts json to avro
* @param kafkaProducer pushes to kafka
* @return chanenl initializers pipeline.
*/
@Bean(name = CHANNEL_PIPELINE_INITIALIZER)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public ChannelInitializer<Channel> channelInitializers(JsonAvroConvertor jsonAvroConvertor,
MFKafkaProducer kafkaProducer) {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
if (log.isInfoEnabled())
log.info("initChannel - initing channel...");
channel.pipeline().addLast(new NioEventLoopGroup(0, new DefaultEventExecutorGroup(THREAD_POOL_SIZE)));
channel.pipeline().addLast(new JsonObjectDecoder(1048576));
channel.pipeline().addLast(jsonAvroConvertor);
channel.pipeline().addLast(kafkaProducer);
if (log.isInfoEnabled())
log.info("channel = " + channel.toString());
}
};
}
}
JsonProducer.java The main program
public class JsonProducer {
private static final Logger log = LoggerFactory.getLogger(JsonProducer.class);
private static MFTcpServer tcpServer;
/**
* Main startup method
*
* @param args not used
*/
public static void main(String[] args) {
System.setProperty("solschema", "false");
try {
// the shutdown hook.
Runtime.getRuntime().addShutdownHook(new Thread(
() -> {
if (tcpServer != null) {
tcpServer.shutdown();
}
}
));
AnnotationConfigApplicationContext context = new
AnnotationConfigApplicationContext(JsonProducerConfig.class);
tcpServer = (MFTcpServer) context.getBean(JsonProducerConfig.TCP_SERVER);
ChannelInitializer<Channel> channelInitializer = (ChannelInitializer<Channel>) context.
getBean(JsonProducerConfig.CHANNEL_PIPELINE_INITIALIZER);
tcpServer.startAndAwait(channelInitializer);
} catch (Exception t) {
log.error("Error while starting JsonProducer ", t);
System.exit(-1);
}
}
}
The MFKafkaProducer.java as the last channel in the pipeline. Note the ctx.writeAndFlush(msg) in the channelRead method which is where I understand the response should be initiated. But what after that. When running this channelFuture.isSuccess() evaluates to false. The response object was an attempt to a String response.
@ChannelHandler.Sharable
public class MFKafkaProducer extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(MFKafkaProducer.class);
@Resource
ApplicationContext context;
@Resource(name = DatabusConfig.ADMIN)
Admin admin;
private Map<String, IProducer> streams = new HashMap<>();
@PreDestroy
public void stop() {
removeAllStreams(); // then stop writing to producers
}
/**
* @param clickRecord the record to be pushed to kafka
* @throws Exception
*/
public void handle(GenericRecord clickRecord) throws Exception {
Utf8 clientId = null;
try {
clientId = (Utf8) clickRecord.get(SchemaUtil.APP_ID);
stream(producer(clientId.toString()), clickRecord);
} catch (Exception e) {
String message = "Could not push click data for clientId:" + clientId;
log.warn("handle - " + message + "!!!", e);
assert clientId != null;
removeStream(clientId.toString());
}
}
/**
* removes all the streams
*/
private void removeAllStreams() {
Set<String> strings = streams.keySet();
for (String clientId : strings) {
removeStream(clientId);
}
}
/**
* removes a particular stream
*
* @param clientId the stream to be removed
*/
private void removeStream(String clientId) {
Assert.notEmpty(streams);
IProducer producer = streams.get(clientId);
producer.stopProducer();
streams.remove(clientId);
}
/**
* @param producer the producer where data needs to be written
* @param clickRecord teh record to be written
*/
private void stream(IProducer producer, GenericRecord clickRecord) {
producer.send(clickRecord);
}
/**
* This will create a producer in case it is not already created.
* If already created return the already present one
*
* @param clientId stream id
* @return the producer instance
*/
private IProducer producer(String clientId) {
if (streams.containsKey(clientId)) {
return streams.get(clientId);
} else {
IProducer producer = admin.createKeyTopicProducer(SchemaUtil.APP_ID, "test_" + clientId, new ICallback() {
@Override
public void onSuccess(long offset) {
if (log.isInfoEnabled())
log.info("onSuccess - Data at offset:" + offset + " send.");
}
@Override
public void onError(long offset, Exception ex) {
if (log.isInfoEnabled())
log.info("onError - Data at offset:" + offset + " failed. Exception: ", ex);
}
@Override
public void onStreamClosed() {
log.warn("onStreamClosed - Stream:" + clientId + " closed.");
removeStream(clientId);
}
});
producer.startProducer();
streams.put(clientId, producer);
return producer;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.debug("KafkaProducer - channelRead() called with " + "ctx = [" + ctx + "], msg = [" + msg + "]");
if (msg instanceof GenericRecord) {
GenericRecord genericRecord = (GenericRecord) msg;
try {
handle(genericRecord);
log.debug("channelRead sending response");
Charset charset = Charset.defaultCharset();
ByteBuf response = Unpooled.copiedBuffer("Just a response", charset);
ChannelFuture future = ctx.writeAndFlush(msg);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess())
log.info("channelRead - future.operationComplete - Response has been delivered to all channels");
else
log.info("channelRead - future.operationComplete - Response has NOT been delivered to all channels");
}
});
} catch (Exception ex) {
log.error("Something went wrong processing the generic record: " + msg + "\n ", ex);
}
} else {
log.debug("KafkaProducer - msg not of Type Generic Record !!! " + msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
log.error("Something went wrong writing to Kafka: \n", cause);
ctx.close();
}
}
ByteBuf responseback to the channel instead of the receivedmsg? - Moh-AwByteBuf response = Unpooled.copiedBuffer(genericRecord.toString(), charset); ChannelFuture future = ctx.writeAndFlush(response);@Moh-Aw: I do get successful delivery of the message now from the ChannelFuture, but somehow I don't see it arriving at my client yet (it is a very simplistic client which might be a cause). - Martijn Kamstra