How to send the buf then receive a msg
The method
Mono<ByteBuf> send(ByteBuf buf){
// how to send the buf then receive a msg
}
I'm trying to implement this method by sending a msg from connection outbound and receiving a msg from inbound and then returning a message Mono. But I can only receive message in then(Publisher) method. It doesn't seem to be able to return to a data Mono
I've tried this.
// the connecttion has been initialized before entering this method.
Mono.just(buf)
.doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
.then(connection
.inbound()
.receiveObject()
.single()
.map(RpcDataPackage.class::cast)
.map(RpcDataPackage::getData)
.map(data -> {
try {
return resCodec.decode(data);
} catch (IOException e) {
throw new RpcRequestException(e);
}
})
);
but it would block until the connection timeout
And I've tried another code. I add a handle
method and put the response to a map.
Then I can get the Mono.fromSupply()
with a while loop break at map.get(key) != null
.
It would block the thread.
.handle(((nettyInbound, nettyOutbound) -> nettyInbound
.receiveObject()
.map(RpcDataPackage.class::cast)
.doOnNext(pkg -> {
String responseKey = "a key"
responseMap.put(responseKey, pkg);
})
.then()))