0
votes

I have an async stub in which I added an observer:

            val obs =  object: StreamObserver<Hallo> {

                override fun onNext(value: Hallo) {

                    streamSuccess(value)
                }

                override fun onError(t: Throwable?) {

                    nonSuccess(t?.message ?: "Unknow error")
                }

                override fun onCompleted() {

                    Log.d("Info", "completed")
                    completed()
                }
            }

I would like a to be able to remove this observer from the async stub, so I can cancel the streaming in the client side.

As says in the github issue: https://github.com/grpc/grpc-java/issues/3095

I tried keeping a local variable of the observer, so the client can do later on:

observer?.onError(Status.CANCELLED.cause)

That didn't work.

Also I tried to create my own class from the abstract class: ClientCallStreamObserver

class CancellableStreamObserver<TResponse>(val next:(value:TResponse)->Unit, val onError:(t:Throwable)-> Unit, val onCompleted:(()->Unit), val onCanceledHandler: (()->Unit)? = null) : ClientCallStreamObserver<TResponse>() {
        override fun isReady(): Boolean {
            return  true
        }

        override fun setOnReadyHandler(onReadyHandler: Runnable?) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun disableAutoInboundFlowControl() {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun cancel(message: String?, cause: Throwable?) {

            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun request(count: Int) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun setMessageCompression(enable: Boolean) {
            //TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
        }

        override fun onNext(value: TResponse) {
            next(value)
        }

        override fun onError(t: Throwable) {
            if (t is StatusException) {
                if (t.status.code == Status.Code.CANCELLED) {
                    onCanceledHandler?.let {
                        it()
                    }
                }
            }
            if (t is StatusRuntimeException) {
                if (t.status.code == Status.Code.CANCELLED) {
                    onCanceledHandler?.let {
                        it()
                    }
                }
            }
            this.onError(t)
        }

        override fun onCompleted() {
            onCompleted()
        }
    }

So later on I can call:

        observer?.cancel("Cancelled for the user",Status.CANCELLED.cause)

That didn't work either.

The way I know it didn't work, it's because if the user adds again a new observer, I get duplicated responses, as if the old observer is still alive.

I know I can shutdown the channel with channel.shutdownNow(). But I think it's too aggressive.

Thanks

1

1 Answers

1
votes

From the referenced https://github.com/grpc/grpc-java/issues/3095:

for async you can use ClientCallStreamObserver.cancel() by casting the returned StreamObserver to ClientCallStreamObserver or implementing having your passed-in StreamObserver implement ClientResponseObserver.

(emphasis added)

grpc-java will implement the appropriate methods, not your instance. So the pattern would be:

stub.foo(req, object: ClientResponseObserver<Hallo> {
    override fun beforeStart(respObs: ClientCallStreamObserver<Hallo>) {
        // save respObs for later
    }
    override fun onNext(value: Hallo) {
        streamSuccess(value)
    }
    override fun onError(t: Throwable?) {
        nonSuccess(t?.message ?: "Unknow error")
    }
    override fun onCompleted() {
        Log.d("Info", "completed")
        completed()
    }
});

// -or- (for streaming calls only)

val obs = ...;
val respObs = stub.foo(obs) as (ClientCallStreamObserver<Hallo>);
respObs.onNext(req);
// save respObs for later

Note that the respObs in both cases would be identical. Using ClientResponseObserver would mainly be for when there is streaming and want to cancel within the response observer to avoid threading races.