1
votes

I want to send data in chunks of 50 with gRPC, however when running responseObserver.onNext multiple times, it just get stuck and wont send the data. But when I stop the stream, I get the data.

This is my code:

// List<List<MyClass>> listOfMyClass
for (List<MyClass> myClasses : listOfMyClass) {
  responseObserver.onNext(buildReply(myClasses));
}
responseObserver.onCompleted();

This will make it stuck. But, if I only run responseObserver.onNext(buildReply(myClasses)); once, I will get the data instantly.

My proto:

message Request {
    string number = 1;
}

message Reply {
    repeated CustomMessage results = 1;
}

service Service {
    rpc MyRequest (Request) returns (stream Reply) {}
}

I've been using a GUI called https://github.com/uw-labs/bloomrpc, which should display the stream easily.

1

1 Answers

0
votes

The StreamObserver interface is non-blocking and doesn't provide back pressure for outbound data. In order to give the client the back pressure, you need to cast the StreamObserver that the client uses to send the data to ServerCallStreamObserver, and observe the "ready" bit. Something like this:

ClientCallStreamObserver<MyClass> clientCallStreamResponseObserver =
      (ClientCallStreamObserver<File>) responseObserver;
clientCallStreamObserver.setOnReadyHandler(new Runnable() {
    public void run() {
      sendUntilNotReady(clientCallStreamResponseObserver);
    }
});
sendUntilNotReady(clientCallStreamResponseObserver);

...

private void sendUntilNotReady(ClientCallStreamObserver<MyClass> clientCallStreamResponseObserver) {
  while (clientCallStreamResponseObserver.isReady()) {
    clientCallStreamResponseObserver.onNext(... /* build reply for next item in listOfMyClass */);
  }
}