I am using gRPC with C++. And I have an async server and sync client. The rpc is of type bidirectional stream.
Here is how I send message with client:
class ConnectionService {
public:
ConnectionService(std::shared_ptr<Channel> channel)
: stub_(Connection::NewStub(channel)) {}
void HearthBeat() {
ClientContext context;
std::shared_ptr<grpc::ClientReaderWriter<Pulse, Pulse> > stream(
stub_->HearthBeat(&context));
std::thread writer([stream]() {
for (int i = 0; i < 100; ++i) {
Pulse p;
p.set_rate(50);
stream->Write(p);
}
stream->WritesDone();
});
Pulse server_pulse;
while (stream->Read(&server_pulse)) {
std::cout << "Got message " << server_pulse.rate()<< std::endl;
}
writer.join();
Status status = stream->Finish();
if (!status.ok()) {
std::cout << "RouteChat rpc failed." << std::endl;
}
}
private:
std::unique_ptr<Connection::Stub> stub_;
};
Here is how I read it and reply on the server:
void Vibranium::ConnectionManager::HearthBeatMethod::Create() {
connectionService_->RequestHearthBeat(&ctx_, &stream_, cq_, cq_,this);
status_ = PROCESS;
}
void Vibranium::ConnectionManager::HearthBeatMethod::Process() {
new HearthBeatMethod(connectionService_, cq_);
stream_.Read(&request_, this);
status_ = READ_CALLED;
}
bool Vibranium::ConnectionManager::HearthBeatMethod::CheckForClientMetadata() {
return false;
}
void Vibranium::ConnectionManager::HearthBeatMethod::ReadStream() {
std::cout << "Received: " << request_.rate() << std::endl;
reply_.set_rate(65);
std::cout << "Rate replied: " << reply_.rate() << std::endl;
stream_.Write(reply_, this);
status_ = WRITE_CALLED;
}
void Vibranium::ConnectionManager::HearthBeatMethod::WriteToStream() {
stream_.Finish(grpc::Status::OK, this);
status_ = FINISH;
}
Here is how I start the server:
void Vibranium::Server::Run() {
std::string server_address(serverIp_+":"+serverPort_);
grpc::ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 3000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 3000);
builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 3000);
builder.AddChannelArgument(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
RegisterServices(builder);
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
void Vibranium::Server::HandleRpcs() {
RegisterMethods();
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<ServiceMethod*>(tag)->Proceed();
}
}
Here is what Proceed();
is doing:
void ServiceMethod::Proceed() {
if (status_ == CREATE) {
Create();
} else if (status_ == PROCESS) {
CheckClient();
Process();
} else if(status_ == READ_CALLED){
ReadStream();
} else if(status_ == WRITE_CALLED){
WriteToStream();
} else {
Finish();
}
}
void ServiceMethod::Finish() {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (ServiceMethod).
delete this;
}
So when I trigger the client it sends 1 message instead of 100 as described in the for loop. On server I can see output:
Received: 50
Rate replied: 65
And on client the output is:
Got message 65
So by this I can see that there is communication between the client and the server however it seems that the server receives and send back just the fist message. Why is that and how can I fix it?