1
votes

I'm running a gRPC client that is connected to a streaming service. The server always sends at least one response message immediately after receiving the client request message. The server may send more messages at any time, or it may not, but the client continues processing until it is shut down. Generally, the client lifetime is significantly shorter than the server, and it needs to be shut down at any time.

My intention is to write a client that will do async read of the streaming messages until it is signaled to stop running. Here is the client, with processing omitted:

void FooClient::WatchFoo(bool& done_)
{
    WatchFooRequest request;
    WatchFooResponse reply;
    ClientContext context;
    CompletionQueue cq;
    Status status;
    void* got_tag;
    bool ok;

    std::unique_ptr<ClientAsyncReader<WatchFooResponse> >
        reader(stub_->PrepareAsyncWatchFoo(&context, request, &cq));

    reader->StartCall((void*)1);
    reader->Read(&reply, (void*)1);

    std::chrono::system_clock::time_point mytp(std::chrono::system_clock::now());

    // continue processing messages in a loop until done becomes true
    while (!done_) {
        mytp += std::chrono:seconds(1);
        auto next_status = cq.AsyncNext<std::chrono::system_clock::time_point>(&got_tag, &ok, mytp);
        if (!ok || next_status == CompletionQueue::NextStatus::SHUTDOWN) {
            break;
        }

        if (next_status == CompletionQueue::NextStatus::TIMEOUT) {
            continue;
        }

        // process reply
        // ...
    }

    context.TryCancel();

    reader->Finish(&status, (void*)1);
    cq.Shutdown();
}

I have the blocking equivalent of this code ( synchronous reader->Read is called inside the while loop ), and it receives all the messages properly. However, the async code only seems to get the first message, and does not get subsequent messages.

I'm probably misinterpreting the docs and doing something lame, but any ideas why only the first message arrives?

1

1 Answers

0
votes

Looking at your code, I'm not seeing a reader->Read being issued after your AsyncNext call in the while loop, so it'll only get one message. Each operation only matches one message.