1
votes

I am trying to implement an async GRPC server, where whenever the client makes a call it gets an indefinite stream of messages. I read through the official documentation. It doesn't cover the scenario where I want to keep the stream open for each RPC. This article - https://www.gresearch.co.uk/article/lessons-learnt-from-writing-asynchronous-streaming-grpc-services-in-c/ addresses the issue of keeping the stream open by basically putting the callback handler object in the completion queue again.

The article suggests:

if (HasData())
{
    responder_.Write(reply_, this);
}
else
{
    grpc::Alarm alarm;
    alarm.Set(completion_queue_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}

I tried using the Alarm object approach as suggested in this article but for some reason on the next Next function call on completion queue I get ok argument as false - GPR_ASSERT(cq_->Next(&tag, &ok));. As a result, I have to close the server and am unable to wait on a stream till further data is available.

I am able to receive data fine till the else case is not hit.

Could someone please help me identify what I might be doing wrong? I am unable to find much C++ resources on GRPC. Thanks!

1

1 Answers

2
votes
  1. When Alarm goes out of scope, it will generate a Cancel() causing you to get !ok in Next().

  2. if you want to use this, you would need to put the Alarm into your class scope and trigger it:

    std::unique_ptr<Alarm> alarm_;
    alarm_.reset(new Alarm);
    alarm_->Set(cq_, grpc_timeout_seconds_to_deadline(10), this);

from the doc on Alarm::Set:

Trigger an alarm instance on completion queue cq at the specified time.

Once the alarm expires (at deadline) or it's cancelled (see Cancel), an event with tag tag will be added to cq. If the alarm expired, the event's success bit will be true, false otherwise (ie, upon cancellation).