5
votes

I'm trying to setup a simple pubslish/subscribe pattern over grpc using service streaming together with async stub on client. After implementing part of streaming messages back to client, I wanted to handle scenarios for connection drops. Right now I'm implementing part when service is for example shut down and client should 'recover' from that loss of connection.

I've read and searched about retry mechanism on google/github/so and finally set up retry policy for method in service which streams messages. As far as I understood, retry mechanism should work when service returns some of retryableStatusCodes defined in retry policy. After introducing retry policy on client I wanted to test it, and the results from two following scenarios are what is confusing me about retry.

First scenario:

  • connect procedure is called (after ~n seconds intentionally no messages are streamed back to client)
  • service is shut down
  • onError is not called on client
  • service is up again
  • connect is reached again with retry

Second scenario:

  • connect procedure is called (after ~n seconds first message arrives, message is processed in onNext handler on client)
  • service is shut down
  • onError is called on client
  • service is up again
  • connect is not reached again with retry

Overall what confuses me is why is there a difference in behavior between the these two scenarios? Why is in the first scenario detected that server returned UNAVAILABLE and retry is attempted, but in second even with same status, retry doesn't work?

Here is the code for connect call on client, connect method on service, and setup of retry policy on client

client:

messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
    @Override
    public void onNext(MessageResponse messageResponse) {
        //process new message
        MessageDto message = new MessageDto();
        message.setBody(messageResponse.getBody());
        message.setTitle(messageResponse.getTitle());

        messageService.broadcastMessage(message);
    }

    @Override
    public void onError(Throwable throwable) {
        //service went down
        LOGGER.error(throwable.getStackTrace());
    }

    @Override
    public void onCompleted() {
        //This method should be called when user logs out of the application
        LOGGER.info(String.format("Message streaming terminated for user %d", userId));
    }
});
service:

@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
    Long userId = request.getUserId();

    ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
        (ServerCallStreamObserver<MessageResponse >) responseObserver;
    serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
    registerClient(userId, serverCallStreamObserver);
    //responseObserver.onCompleted() is left out so connection is not terminated
}


@EventListener
public void listenForMessages(MessageEvent messageEvent) {
    //omitted code (just some data retrieving - populate conn and message vars)....

    MessageResponse.Builder builder = MessageResponse.newBuilder();
    StreamObserver<MessageResponse> observer = conn.getResponseObserver();
    builder.setType(message.getType());
    builder.setTitle(message.getTitle());
    builder.setBody(message.getBody());

    observer.onNext(builder.build())
}

retryPolicy:

{
  "methodConfig" : [
    {
      "name": [
        {
          "service": "ch.example.proto.MessageService",
          "method": "connect"
        }
      ],
      "retryPolicy": {
        "maxAttempts": 10,
        "initialBackoff": "5s",
        "maxBackoff": "30s",
        "backoffMultiplier": 2,
        "retryableStatusCodes": ["UNAVAILABLE"]
      }
    }
  ]
}
1

1 Answers

8
votes

The problem is that receiving a message commits the RPC. This is discussed in gRFC A6 Client Retries. It mentions the Response-Headers, which are implicitly sent when the server responds with the first message.

Essentially, once gRPC has passed data back to the client there is no way to automatically retry. If gRPC retried how should it combine a new stream with what it has already responded with? Should it skip the first N responses? But what if the responses are different now? The problem is even worse for metadata (delivered via Response-Headers) as those cannot be provided to the client a second time.

gRPC is able to replay the client's requests to multiple backends, but once it starts receiving a response from a backend it will become "fixed" to that backend and be unable to change its decision.

You will need application-level retry to re-establish the stream. When the client reestablishes the stream it may need to modify the request to inform the server what messages the client has already received.