10
votes

I want to use gRPC to let clients subscribe to events generated by the server. I have an RPC declared like so:

rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);

where the returned stream is infinite. To "unsubscribe", clients cancel the RPC (btw. is there a cleaner way?).

I have figured out how the client can cancel the call:

Context.CancellableContext cancellableContext =
         Context.current().withCancellation();
cancellableContext.run(() -> {
   stub.subscribe(request, callback);
});
// do other stuff / wait for reason to unsubscribe
cancellableContext.cancel(new InterruptedException());

However, the server does not seem to notice that a client has cancelled its call. I'm testing this with a dummy server implementation:

@Override
public void subscribe(SubscribeRequest request,
                      StreamObserver<SubscribeResponse> responseObserver) {
  // in real code, this will happen in a separate thread.
  while (!Thread.interrupted()) {
    responseObserver.onNext(SubscribeResponse.getDefaultInstance());
  }
}

The server will happily continue sending its messages into the ether. How can the server recognize that the call was cancelled by the client and thus stop sending responses?

1

1 Answers

8
votes

I found the answer myself. You cast the StreamObserver passed to subscribe to a ServerCallStreamObserver, which exposes methods isCancelled and setOnCancelHandler.

scso = ((ServerCallStreamObserver<SubscribeResponse>) responseObserver);

scso.setOnCancelHandler(handler);
// or
if (scso.isCancelled()) {
  // do whatever
}

This, to me, begs the question why subscribe isn't passed a ServerCallStreamObserver to begin with.