I'm trying to setup a simple pubslish/subscribe pattern over grpc using service streaming together with async stub on client. After implementing part of streaming messages back to client, I wanted to handle scenarios for connection drops. Right now I'm implementing part when service is for example shut down and client should 'recover' from that loss of connection.
I've read and searched about retry mechanism on google/github/so and finally set up retry policy for method in service which streams messages. As far as I understood, retry mechanism should work when service returns some of retryableStatusCodes defined in retry policy. After introducing retry policy on client I wanted to test it, and the results from two following scenarios are what is confusing me about retry.
First scenario:
- connect procedure is called (after ~n seconds intentionally no messages are streamed back to client)
- service is shut down
- onError is not called on client
- service is up again
- connect is reached again with retry
Second scenario:
- connect procedure is called (after ~n seconds first message arrives, message is processed in onNext handler on client)
- service is shut down
- onError is called on client
- service is up again
- connect is not reached again with retry
Overall what confuses me is why is there a difference in behavior between the these two scenarios? Why is in the first scenario detected that server returned UNAVAILABLE and retry is attempted, but in second even with same status, retry doesn't work?
Here is the code for connect call on client, connect method on service, and setup of retry policy on client
client:
messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
@Override
public void onNext(MessageResponse messageResponse) {
//process new message
MessageDto message = new MessageDto();
message.setBody(messageResponse.getBody());
message.setTitle(messageResponse.getTitle());
messageService.broadcastMessage(message);
}
@Override
public void onError(Throwable throwable) {
//service went down
LOGGER.error(throwable.getStackTrace());
}
@Override
public void onCompleted() {
//This method should be called when user logs out of the application
LOGGER.info(String.format("Message streaming terminated for user %d", userId));
}
});
service:
@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
Long userId = request.getUserId();
ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
(ServerCallStreamObserver<MessageResponse >) responseObserver;
serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
registerClient(userId, serverCallStreamObserver);
//responseObserver.onCompleted() is left out so connection is not terminated
}
@EventListener
public void listenForMessages(MessageEvent messageEvent) {
//omitted code (just some data retrieving - populate conn and message vars)....
MessageResponse.Builder builder = MessageResponse.newBuilder();
StreamObserver<MessageResponse> observer = conn.getResponseObserver();
builder.setType(message.getType());
builder.setTitle(message.getTitle());
builder.setBody(message.getBody());
observer.onNext(builder.build())
}
retryPolicy:
{
"methodConfig" : [
{
"name": [
{
"service": "ch.example.proto.MessageService",
"method": "connect"
}
],
"retryPolicy": {
"maxAttempts": 10,
"initialBackoff": "5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}
]
}