I have an async stub in which I added an observer:
val obs = object: StreamObserver<Hallo> {
override fun onNext(value: Hallo) {
streamSuccess(value)
}
override fun onError(t: Throwable?) {
nonSuccess(t?.message ?: "Unknow error")
}
override fun onCompleted() {
Log.d("Info", "completed")
completed()
}
}
I would like a to be able to remove this observer from the async stub, so I can cancel the streaming in the client side.
As says in the github issue: https://github.com/grpc/grpc-java/issues/3095
I tried keeping a local variable of the observer, so the client can do later on:
observer?.onError(Status.CANCELLED.cause)
That didn't work.
Also I tried to create my own class from the abstract class: ClientCallStreamObserver
class CancellableStreamObserver<TResponse>(val next:(value:TResponse)->Unit, val onError:(t:Throwable)-> Unit, val onCompleted:(()->Unit), val onCanceledHandler: (()->Unit)? = null) : ClientCallStreamObserver<TResponse>() {
override fun isReady(): Boolean {
return true
}
override fun setOnReadyHandler(onReadyHandler: Runnable?) {
//TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun disableAutoInboundFlowControl() {
//TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun cancel(message: String?, cause: Throwable?) {
//TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun request(count: Int) {
//TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun setMessageCompression(enable: Boolean) {
//TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun onNext(value: TResponse) {
next(value)
}
override fun onError(t: Throwable) {
if (t is StatusException) {
if (t.status.code == Status.Code.CANCELLED) {
onCanceledHandler?.let {
it()
}
}
}
if (t is StatusRuntimeException) {
if (t.status.code == Status.Code.CANCELLED) {
onCanceledHandler?.let {
it()
}
}
}
this.onError(t)
}
override fun onCompleted() {
onCompleted()
}
}
So later on I can call:
observer?.cancel("Cancelled for the user",Status.CANCELLED.cause)
That didn't work either.
The way I know it didn't work, it's because if the user adds again a new observer, I get duplicated responses, as if the old observer is still alive.
I know I can shutdown the channel with channel.shutdownNow()
. But I think it's too aggressive.
Thanks