0
votes

How to send the buf then receive a msg

The method

Mono<ByteBuf> send(ByteBuf buf){
    // how to send the buf then receive a msg
}

I'm trying to implement this method by sending a msg from connection outbound and receiving a msg from inbound and then returning a message Mono. But I can only receive message in then(Publisher) method. It doesn't seem to be able to return to a data Mono

I've tried this.

// the connecttion has been initialized before entering this method.

        Mono.just(buf)
                .doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
                .then(connection
                        .inbound()
                        .receiveObject()
                        .single()
                        .map(RpcDataPackage.class::cast)
                        .map(RpcDataPackage::getData)
                        .map(data -> {
                            try {
                                return resCodec.decode(data);
                            } catch (IOException e) {
                                throw new RpcRequestException(e);
                            }
                        })
                );

but it would block until the connection timeout

And I've tried another code. I add a handle method and put the response to a map. Then I can get the Mono.fromSupply() with a while loop break at map.get(key) != null.

It would block the thread.

                .handle(((nettyInbound, nettyOutbound) -> nettyInbound
                        .receiveObject()
                        .map(RpcDataPackage.class::cast)
                        .doOnNext(pkg -> {
                            String responseKey = "a key"

                            responseMap.put(responseKey, pkg);
                        })
                        .then()))
3

3 Answers

0
votes

You do not specify what you expect. See the example below, it sends some data and then receives what the server's returned.

    @Test
    public void test() {
        Connection connection =
                TcpClient.create()
                         .wiretap(true)
                         .host("example.com")
                         .port(80)
                         .connect()
                         .block();

        assertNotNull(connection);

        connection.outbound()
                  .sendObject(Unpooled.wrappedBuffer("test".getBytes()))
                  .then(connection.inbound()
                                  .receiveObject()
                                  .last()
                                  .doOnNext(System.out::println)
                                  .then())
                  .then()
                  .block();
    }
0
votes

I read the Mono javadoc and found MonoSink.

Mono.create(monoSink -> {
  // some call
})

when inbound receive an object response just do sink.success()

0
votes

You should use a combination of NettyOutbound::then to listen for write completion and Mono::then to read your NettyInboud after write.

  Mono<String> resposeMono = TcpClient.create()
            .connect()
            .flatMap(connection -> connection.outbound().sendString(Mono.just("Hello!"))
                    .then()
                    .then(connection.inbound().receive().aggregate().asString())
                    .doOnTerminate(connection::dispose));

This will write "Hello!" to the output, read all bytes from the input as a string then dispose the connection.