3
votes

I am using gRPC with C++. And I have an async server and sync client. The rpc is of type bidirectional stream.

Here is how I send message with client:

class ConnectionService {
public:
    ConnectionService(std::shared_ptr<Channel> channel)
            : stub_(Connection::NewStub(channel)) {}

    void HearthBeat() {
        ClientContext context;

        std::shared_ptr<grpc::ClientReaderWriter<Pulse, Pulse> > stream(
                stub_->HearthBeat(&context));

        std::thread writer([stream]() {
            for (int i = 0; i < 100; ++i) {
                Pulse p;
                p.set_rate(50);
                stream->Write(p);
            }
            stream->WritesDone();
        });

        Pulse server_pulse;
        while (stream->Read(&server_pulse)) {
            std::cout << "Got message " << server_pulse.rate()<< std::endl;
        }
        writer.join();
        Status status = stream->Finish();
        if (!status.ok()) {
            std::cout << "RouteChat rpc failed." << std::endl;
        }
    }

private:
    std::unique_ptr<Connection::Stub> stub_;
};

Here is how I read it and reply on the server:

void Vibranium::ConnectionManager::HearthBeatMethod::Create() {
    connectionService_->RequestHearthBeat(&ctx_, &stream_, cq_, cq_,this);
    status_ = PROCESS;
}

void Vibranium::ConnectionManager::HearthBeatMethod::Process() {
    new HearthBeatMethod(connectionService_, cq_);
    stream_.Read(&request_, this);
    status_ = READ_CALLED;
}

bool Vibranium::ConnectionManager::HearthBeatMethod::CheckForClientMetadata() {
    return false;
}

void Vibranium::ConnectionManager::HearthBeatMethod::ReadStream() {
    std::cout << "Received: " << request_.rate() << std::endl;
    reply_.set_rate(65);
    std::cout << "Rate replied: " << reply_.rate() << std::endl;
    stream_.Write(reply_, this);
    status_ = WRITE_CALLED;
}

void Vibranium::ConnectionManager::HearthBeatMethod::WriteToStream() {
    stream_.Finish(grpc::Status::OK, this);
    status_ = FINISH;
}

Here is how I start the server:

void Vibranium::Server::Run() {
    std::string server_address(serverIp_+":"+serverPort_);

    grpc::ServerBuilder builder;
    // Listen on the given address without any authentication mechanism.
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 3000);
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 3000);
    builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
    builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
    builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 3000);
    builder.AddChannelArgument(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    RegisterServices(builder);
    cq_ = builder.AddCompletionQueue();
    // Finally assemble the server.
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRpcs();
}

void Vibranium::Server::HandleRpcs() {
    RegisterMethods();
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) {
        GPR_ASSERT(cq_->Next(&tag, &ok));
        GPR_ASSERT(ok);
        static_cast<ServiceMethod*>(tag)->Proceed();
    }
}

Here is what Proceed(); is doing:

void ServiceMethod::Proceed() {
    if (status_ == CREATE) {
        Create();
    } else if (status_ == PROCESS) {
        CheckClient();
        Process();
    } else if(status_ == READ_CALLED){
        ReadStream();
    } else if(status_ == WRITE_CALLED){
        WriteToStream();
    } else {
        Finish();
    }
}

void ServiceMethod::Finish() {
    GPR_ASSERT(status_ == FINISH);
    // Once in the FINISH state, deallocate ourselves (ServiceMethod).
    delete this;
}

So when I trigger the client it sends 1 message instead of 100 as described in the for loop. On server I can see output:

Received: 50
Rate replied: 65

And on client the output is:

Got message 65

So by this I can see that there is communication between the client and the server however it seems that the server receives and send back just the fist message. Why is that and how can I fix it?

1

1 Answers

0
votes

I think the immediate cause of the crash you're seeing is that in HearthBeatMethod::Process(), you are starting both a read and write using the same tag, and that tag is not even initialized (it's a void* tag that is never given a value), so there's basically no way you can tell when either of these operations is complete. And more importantly (this is probably where the crash is happening), the code in Server::HandleRpcs() that is polling your completion queue is assuming that every tag is actually the address of a ServiceMethod object, whose Proceed() method will be called. Since the tag that's coming back is an uninitialized pointer, you're basically calling a method on an arbitrary address, which is causing the crash.

I also have a couple of comments about the broader design here, in case they're useful.

First, for a bidi streaming service, you probably need more states than simply CREATE, PROCESS, and FINISH, because you can only have one read or write in progress at once. You need to know when each read finishes so that you can wait to start the next one until after the previous one is completed. Same thing for writes. And note that reads and writes are not synchronized together at all, so they can start and finish at completely different times.

And second, it's not clear to me why you are creating a separate streaming RPC to deal with connection management. The gRPC channel itself should deal with connection management for you; in your application, you should not need to worry about that. In principle, the application should just send the individual RPCs it wants and let the channel handle the connection management for you. If the reason you care about this is that you are doing some sort of session affinity thing, then consider just using a streaming RPC for your application itself.

I hope this info is helpful.