0
votes

I have a streaming service that indefinitely streams from the server to a client until the client cancels.

On the server side, I have a thread that populates an ehcache with data sourced from a database.

Ehcache provides callbacks on cache events, i.e, when an item is added, when an item is removed, etc. I only care about notifying clients when an element is put into the cache, so when a client connects to my gRPC service, I register a notifyElementPut() callback with the cache, that has a reference to the connected clients StreamObserver:

public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {


  private StreamObserver<FooUpdateResponse> responseObserver;

  public GrpcAwareCacheEventListener(
      StreamObserver<FooUpdateResponse> responseObserver) {
    this.responseObserver = responseObserver;
  }


  @Override
  public void notifyElementPut(Ehcache cache, Element element) throws CacheException {

    Foo foo = (Foo) element.getObjectValue();
    if (foo != null) {
      responseObserver.onNext(
          FooResponse.newBuilder().setFoo(foo).build());
    }
  }
}

My streaming foo service is as follows:

    public void streamFooUpdates(Empty request,
              StreamObserver<FooResponse> responseObserver) {

            final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
            fooCache.getCacheEventNotificationService().registerListener(eventListener);
            Context.current().withCancellation().addListener(new CancellationListener() {

              public void cancelled(Context context) {
    log.info("inside context cancelled callback");      
  fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
              }

            }, ForkJoinPool.commonPool());



          }

This all works fine, the client is notified of all foo updates as long as he is connected.

However, after the client disconnects or explicitly cancels the call, I expect that the server's Context's cancellation listener would fire, unregistering the callback with the cache.

This is not the case, regardless of whether the client shutdowns the channel, or explicitly cancels the call. (I expect the server side cancelled context to fire for both of these events). I'm wondering if my cancel semantics on the client side are incorrect, here is the my client code, taken from a test case:

Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
        .usePlaintext().build();

    FooServiceGrpc.FooService stub = FooServiceGrpc
        .newStub(channel);


    ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
      public void onNext(FooResponse response) {
        log.info("received foo: {}", response.getFoo());
      }

      public void onError(Throwable throwable) {

      }

      public void onCompleted() {

      }

      public boolean isReady() {
        return false;
      }

      public void setOnReadyHandler(Runnable runnable) {

      }

      public void disableAutoInboundFlowControl() {

      }

      public void request(int i) {

      }

      public void setMessageCompression(boolean b) {

      }

      public void cancel(@Nullable String s, @Nullable Throwable throwable) {

      }
    };

    stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
    Thread.sleep(10000); // sleep 10 seconds while messages are received.
    cancellableObserver.cancel("cancelling from test", null); //explicit cancel
    ((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.

    Thread.sleep(7000); //channel should be shutdown by now.

  }

I'm wondering why the server is not firing the "Context cancelled" callback.

Thanks!

1

1 Answers

3
votes

You are not cancelling the client call correctly. The StreamObserver on the second argument of stub.streamFooUpdates() is your callback. You shouldn't call anything on that StreamObserver.

There are two ways to cancel the call from the client-side.

Option 1: Pass a ClientResponseObserver as the second argument, implement beforeStart(), which gives you a ClientCallStreamObserver, on which you can call cancel().

Option 2: Run stub.streamFooUpdates() inside a CancellableContext, and cancel the Context to cancel the call. Note that a CancellableContext must be always be cancelled, that's what the finally block is for.

CancellableContext withCancellation = Context.current().withCancellation();
try {
  withCancellation.run(() -> {
      stub.streamFooUpdates(...);
      Thread.sleep(10000);
      withCancellation.cancel(null);
  });
} finally {
  withCancellation.cancel(null);
}