1
votes

This is my first question on StackOverflow and I hope I have adhered to the expected standards.

I have been taking over some code from someone else who isn't working here anymore and I'm pretty much stranded here. I searched and asked some colleagues (not too much Java experience unfortunately) but no-one seems to be able to help me. Searching didn't really help me either.

I'm sending Json requests to a Netty server from a client which intentionally is NOT implemented using Netty. For now it is just a simple Java socket, but the intention is to have a Flask client send requests to the Netty server. The requests arrive (both using Java Sockets and using Python Flask), and get properly processed in the pipeline, but I want to send a response to the client and although I suspect where in the code to send the response I'm clearly missing out on something as I don't get any response. Any suggestions?

The Java Socket client (note that the json1 and json2 strings have been omitted from the snippet here as they are rather long, but they are formatted properly). Posting requests using a Socket and the related output stream. The response part (with the input stream for the same socket) is just some test which I have my doubt about, but not sure how to do this otherwise (and that's why I kept it here). I've been seeing plenty of examples with clients implementing Netty interfaces and that seems to work fine, but as said I want a client not using Netty to be able to receive the responses as well (if that's possible at all).

String serverResponse;

for (int j = 0; j < 100; j++) {
    for (int i = 0; i < 1000; i++) {
        try {
            Socket s = new Socket("localhost", 12000);
            PrintWriter out = new PrintWriter(s.getOutputStream(), true);
            out.write(json1 + i + json2);
            out.flush();

            // Testing only - trying to get the response back from the server
            BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream()));
            while(true) {
                if ((serverResponse = in.readLine()) != null) {
                    log.info("server says", serverResponse);
                    break;
                }
            }

            out.close();
            s.close();
            Thread.sleep(1000);

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    Thread.sleep(2000);
}

MCTcpServer.java

/**
 * Abstract TCP Server class. this class should be implemented in the subclass to implement an actual server.
 *
 * @param <R> The data to be read from the socket.
 * @param <W> data to be written (in case of duplex) from the socket.
 */

public abstract class MFTcpServer<R, W> {

    protected final AtomicBoolean started;

    protected MFTcpServer() {
        this.started = new AtomicBoolean();
    }

    /**
     * Start the server.
     *
     * @param initializer the channel initializers. they will be called when a new client connects to the server.
     * @return instance of tcp server
     */
    public final MFTcpServer<R, W> start(ChannelInitializer<Channel> initializer) {
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("Server already started");
        }

        doStart(initializer);
        return this;
    }

    /**
     * Start the server and wait for all the threads to be finished before shutdown.
     * @param initializer the channel initializers. they will be called when a new client connects to the server.
     */
    public final void startAndAwait(ChannelInitializer<Channel> initializer) {
        start(initializer);
        awaitShutdown();
    }

    /**
     * Shutdown the server
     * @return true if successfully shutdown.
     */
    public final boolean shutdown() {
        return !started.compareAndSet(true, false) || doShutdown();
    }

    /**
     * Wait for all the threads to be finished before shutdown.
     */
    public abstract void awaitShutdown();

    /**
     * Do the shutdown now.
     * @return true if successfully shutdown
     */
    public abstract boolean doShutdown();

    /**
     * start the server
     * @param initializer the channel initializers. they will be called when a new client connetcs to the server.
     * @return instance of tcp server
     */
    public abstract MFTcpServer<R, W> doStart(ChannelInitializer<Channel> initializer);

    /**
     *
     * @return the port where the server is running.
     */
    public abstract int getPort();

MFNetty4TcpServer.java Actual server implementation

public class MFNetty4TcpServer<R, W> extends MFTcpServer<R, W> {

    private static final Logger logger = LoggerFactory.getLogger(MFNetty4TcpServer.class);
    private static final int BOSS_THREAD_POOL_SIZE = 2;

    private int port;
    private ServerBootstrap bootstrap;
    private ChannelFuture bindFuture;

    /**
     * The constructor.
     *
     * @param port port where to listen
     */
    protected MFNetty4TcpServer(int port) {
        this.port = port;
        final NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
                (BOSS_THREAD_POOL_SIZE));
        final NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
                (JsonProducerConfig.THREAD_POOL_SIZE));

        bootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class);
    }

    @Override
    public MFNetty4TcpServer<R, W> doStart(ChannelInitializer<Channel> initializer) {
        bootstrap.childHandler(new ChannelInitializer<Channel>() {

            @Override
            protected void initChannel(Channel ch) throws Exception {

                if (initializer != null) {
                    ch.pipeline().addLast(initializer);
                }
            }
        });

        try {
            bindFuture = bootstrap.bind(port).sync();
            if (!bindFuture.isSuccess()) {
                // Connection not successful
                throw new RuntimeException(bindFuture.cause());
            }
            SocketAddress localAddress = bindFuture.channel().localAddress();
            if (localAddress instanceof InetSocketAddress) {
                port = ((InetSocketAddress) localAddress).getPort();
                logger.info("Started server at port: " + port);
            }

        } catch (InterruptedException e) {
            logger.error("Error waiting for binding server port: " + port, e);
        }

        return this;
    }

    @Override
    public void awaitShutdown() {
        try {
            bindFuture.channel().closeFuture().await();
        } catch (InterruptedException e) {
            Thread.interrupted(); // Reset the interrupted status
            logger.error("Interrupted while waiting for the server socket to close.", e);
        }
    }

    @Override
    public boolean doShutdown() {
        try {
            bindFuture.channel().close().sync();
            return true;
        } catch (InterruptedException e) {
            logger.error("Failed to shutdown the server.", e);
            return false;
        }
    }

    @Override
    public int getPort() {
        return port;
    }

    /**
     * Creates a tcp server at the defined port.
     *
     * @param port port to listen to
     * @param <R>  data to be read
     * @param <W>  data to be written back. Only in case of duplex connection.
     * @return instance of tcp server.
     */
    public static <R, W> MFTcpServer<R, W> create(int port) {
        return new MFNetty4TcpServer<>(port);
    }

}

JsonProducerConfig.java The pipeline is setup here.

/**
 * Spring Configuration class of the application.
 */
@Configuration
@Import({DatabusConfig.class})
public class JsonProducerConfig {

    private static final Logger log = LoggerFactory.getLogger(JsonProducerConfig.class);

    public static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;

    public static final String TCP_SERVER = "tcpServer";
    public static final String CHANNEL_PIPELINE_INITIALIZER = "channel_initializer";
    public static final String MF_KAFKA_PRODUCER = "mf_kafka_producer";
    public static final String JSON_AVRO_CONVERTOR = "jsonAvroConvertor";

    @Value("#{systemProperties['tcpserver.port']?:'12000'}")
    private String tcpServerPort;

    @Bean(name = TCP_SERVER)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public MFTcpServer nettyTCPServer() {
        return MFNetty4TcpServer.create(Integer.parseInt(tcpServerPort));
    }

    @Bean(name = MF_KAFKA_PRODUCER)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public MFKafkaProducer pushToKafka() {
        return new MFKafkaProducer();
    }

    @Bean(name = JSON_AVRO_CONVERTOR)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public JsonAvroConvertor jsonAvroConvertor() {
        return new JsonAvroConvertor();
    }

    /**
     * This is where the pipeline is set for processing of events.
     *
     * @param jsonAvroConvertor converts json to avro
     * @param kafkaProducer     pushes to kafka
     * @return chanenl initializers pipeline.
     */
    @Bean(name = CHANNEL_PIPELINE_INITIALIZER)
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public ChannelInitializer<Channel> channelInitializers(JsonAvroConvertor jsonAvroConvertor,
                                                           MFKafkaProducer kafkaProducer) {
        return new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {

                if (log.isInfoEnabled())
                    log.info("initChannel - initing channel...");

                channel.pipeline().addLast(new NioEventLoopGroup(0, new DefaultEventExecutorGroup(THREAD_POOL_SIZE)));
                channel.pipeline().addLast(new JsonObjectDecoder(1048576));
                channel.pipeline().addLast(jsonAvroConvertor);
                channel.pipeline().addLast(kafkaProducer);

                if (log.isInfoEnabled())
                    log.info("channel = " + channel.toString());
            }
        };
    }

}

JsonProducer.java The main program

public class JsonProducer {

    private static final Logger log = LoggerFactory.getLogger(JsonProducer.class);

    private static MFTcpServer tcpServer;

    /**
     * Main startup method
     *
     * @param args not used
     */
    public static void main(String[] args) {
        System.setProperty("solschema", "false");

        try {

            // the shutdown hook.
            Runtime.getRuntime().addShutdownHook(new Thread(
                    () -> {
                        if (tcpServer != null) {
                            tcpServer.shutdown();
                        }
                    }
            ));

            AnnotationConfigApplicationContext context = new
                    AnnotationConfigApplicationContext(JsonProducerConfig.class);

            tcpServer = (MFTcpServer) context.getBean(JsonProducerConfig.TCP_SERVER);

            ChannelInitializer<Channel> channelInitializer = (ChannelInitializer<Channel>) context.
                    getBean(JsonProducerConfig.CHANNEL_PIPELINE_INITIALIZER);

            tcpServer.startAndAwait(channelInitializer);

        } catch (Exception t) {
            log.error("Error while starting JsonProducer ", t);
            System.exit(-1);
        }
    }
}

The MFKafkaProducer.java as the last channel in the pipeline. Note the ctx.writeAndFlush(msg) in the channelRead method which is where I understand the response should be initiated. But what after that. When running this channelFuture.isSuccess() evaluates to false. The response object was an attempt to a String response.

@ChannelHandler.Sharable
public class MFKafkaProducer extends ChannelInboundHandlerAdapter {

    private static final Logger log = LoggerFactory.getLogger(MFKafkaProducer.class);

    @Resource
    ApplicationContext context;

    @Resource(name = DatabusConfig.ADMIN)
    Admin admin;

    private Map<String, IProducer> streams = new HashMap<>();

    @PreDestroy
    public void stop() {
        removeAllStreams(); // then stop writing to producers
    }

    /**
     * @param clickRecord the record to be pushed to kafka
     * @throws Exception
     */
    public void handle(GenericRecord clickRecord) throws Exception {
        Utf8 clientId = null;
        try {
            clientId = (Utf8) clickRecord.get(SchemaUtil.APP_ID);
            stream(producer(clientId.toString()), clickRecord);
        } catch (Exception e) {
            String message = "Could not push click data for clientId:" + clientId;
            log.warn("handle - " + message + "!!!", e);
            assert clientId != null;
            removeStream(clientId.toString());
        }
    }

    /**
     * removes all the streams
     */
    private void removeAllStreams() {
        Set<String> strings = streams.keySet();

        for (String clientId : strings) {
            removeStream(clientId);
        }
    }

    /**
     * removes a particular stream
     *
     * @param clientId the stream to be removed
     */
    private void removeStream(String clientId) {
        Assert.notEmpty(streams);
        IProducer producer = streams.get(clientId);
        producer.stopProducer();
        streams.remove(clientId);
    }

    /**
     * @param producer    the producer where data needs to be written
     * @param clickRecord teh record to be written
     */
    private void stream(IProducer producer, GenericRecord clickRecord) {
        producer.send(clickRecord);
    }

    /**
     * This will create a producer in case it is not already created.
     * If already created return the already present one
     *
     * @param clientId stream id
     * @return the producer instance
     */
    private IProducer producer(String clientId) {
        if (streams.containsKey(clientId)) {
            return streams.get(clientId);
        } else {
            IProducer producer = admin.createKeyTopicProducer(SchemaUtil.APP_ID, "test_" + clientId, new ICallback() {
                @Override
                public void onSuccess(long offset) {
                    if (log.isInfoEnabled())
                        log.info("onSuccess - Data at offset:" + offset + " send.");
                }

                @Override
                public void onError(long offset, Exception ex) {
                    if (log.isInfoEnabled())
                        log.info("onError - Data at offset:" + offset + " failed. Exception: ", ex);
                }

                @Override
                public void onStreamClosed() {
                    log.warn("onStreamClosed - Stream:" + clientId + " closed.");
                    removeStream(clientId);
                }
            });
            producer.startProducer();
            streams.put(clientId, producer);
            return producer;
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.debug("KafkaProducer - channelRead() called with " + "ctx = [" + ctx + "], msg = [" + msg + "]");

        if (msg instanceof GenericRecord) {
            GenericRecord genericRecord = (GenericRecord) msg;
            try {
                handle(genericRecord);
                log.debug("channelRead sending response");
                Charset charset = Charset.defaultCharset();
                ByteBuf response = Unpooled.copiedBuffer("Just a response", charset);
                ChannelFuture future = ctx.writeAndFlush(msg);
                future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess())
                            log.info("channelRead - future.operationComplete - Response has been delivered to all channels");
                        else
                            log.info("channelRead - future.operationComplete - Response has NOT been delivered to all channels");
                    }
                });
            } catch (Exception ex) {
                log.error("Something went wrong processing the generic record: " + msg + "\n ", ex);
            }
        } else {
            log.debug("KafkaProducer - msg not of Type Generic Record !!! " + msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        log.error("Something went wrong writing to Kafka: \n", cause);
        ctx.close();
    }

}
1
I recommend to log ChannelFuture#cause() if ChannelFuture#isSuccess() returns false - forty-two
what happens if you actually write your ByteBuf response back to the channel instead of the received msg? - Moh-Aw
Thanks @forty-two, this was indeed an issue as the message being serialized was not a ByteBuf but a GenericRecord. I solved this by creating a ByteBuf from the genericRecord (just for testing purposes now to see if there is any response at all: ByteBuf response = Unpooled.copiedBuffer(genericRecord.toString(), charset); ChannelFuture future = ctx.writeAndFlush(response); @Moh-Aw: I do get successful delivery of the message now from the ChannelFuture, but somehow I don't see it arriving at my client yet (it is a very simplistic client which might be a cause). - Martijn Kamstra
Thanks for your help, I finally solved it. It was indeed a poor implementation of the client. By using a SocketChannel instead of just a Socket I do see the replies on my client side as well. - Martijn Kamstra
I suggest that you write your solution and accept it to close this issue and so others can take advantage of it. :) - eliasah

1 Answers

0
votes

Using ChannelFuture#cause() I noticed I was not serializing a ByteBuf object, but a GenericRecord instead. Using

ByteBuf response = Unpooled.copiedBuffer(genericRecord.toString(), charset); 
ChannelFuture future = ctx.writeAndFlush(response);

the GenericRecord gets converted to a ButeBuf and sends a response using the writeAndFlush method.

The test client using a Socket implementation somehow never really received a response, but by using a SocketChannel this was resolved as well.