1
votes

UPDATE2 - yep, the following extension function does what you need.

suspend fun Method.invokeSuspend(obj: Any, vararg args: Any?): Any? =
        kotlinFunction!!.callSuspend(obj, *args)

be nice if the lib doc for callSuspend

https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.reflect.full/call-suspend.html

explicitly stated that the receiver if applicable is first in the vararg list. but i'm happy its now possible to do it in 1.3. And it's baked into the Kotlin API now so you longer have to the reflective hack to pull out the backing continuation and invoke the transformed java method through the Java reflection API.


UPDATE - i can see from another stackoverflow question that Kotlin 1.3 has KFunction.callSuspend, anyone know if that can be used in my case and invoked against a reflective method? In which case how can it be called?

val ret = method.kotlinFunction?.callSuspend(/*???*/)

how do you bind the target object? method.invoke takes the target followed by vargargs for the method parameters, but callSuspend only takes varargs.

or is callSuspend just for standalone functions?


I'm writing a fairly sophisticated remoting framework in kotlin where a class implementing an interface (with annotation similar to JAX_RS) can be efficiently remoted over several different transports including HTTP2 and VERTX, and called through a stub proxy implementing the interface so its completely transparent to the calling code. There are reasons i'm writing a custom implementation which i don't need to get into. Everything's based on suspending functions and coroutines - which are awesome.

In order to do this the kotlin interface is used to auto generate a transparent proxy stub on the client side and a dispatcher on the endpoint side. The dispatcher automatically enforces security by looking at security annotations on the interface methods. Identity data can be accessed from the implementation code through the coroutine context.

Everything's working, except the dispatcher obviously has to use reflection to invoke the suspended function on the implementing class. I cannot figure out how to propagate the coroutine context across the reflective suspended invocation. Not only that, the default ThreadPool for coroutines doesn't seem to be used either - instead it uses some fork-join pool.

Coroutines are implemented great in my opinion, but when you start doing the low level stuff you can't avoid the ugly underbelly. The other thing i noticed is a default method in a kotlin interface doesn't map to a default method in the underlying generated java interface. Which also caused my some grief, but thats a seperate issue.

Anyway - if anyone knows how to fix this final issue? Thanks.


// attach an extension function to 
suspend fun Method.invokeSuspend(obj: Any,  vararg args: Any?): Any? =
            suspendCoroutine { cont ->
                println("in thread "+Thread.currentThread().name)
                val ret=invoke(obj, *args, cont)
                cont.resume(ret)
            }

//....

withContext(kc) {
    // kc NOT propagated through method invocation...
    meth.invokeSuspend(rec.ob, args)!!
}

2

2 Answers

0
votes
suspend fun Method.invokeSuspend(obj: Any,  vararg args: Any?): Any? =
            suspendCoroutine { cont ->
                val ret=invoke(obj, *args, cont)
                cont.resume(ret)
            }

There are two main mistakes here:

  1. you expect invoke() to return the value that you must resume the continuation with
  2. you call the user-level suspendCoroutine function instead of the low-level suspendCoroutineUninterceptedOrReturn

Behind these mistakes there may be a deeper misunderstanding of the coroutine suspension mechanism, so let me try to elaborate on that. This is a way to correct your code, taken from the implementation of KCallable.callSuspend:

suspend fun Method.invokeSuspend(obj: Any,  vararg args: Any?): Any? =
    suspendCoroutineUninterceptedOrReturn { cont -> invoke(obj, *args, cont) }

Note the main feature of this code: it just passes the continuation to the invoked function and never tries to resume it with the result of the invocation.

Now, how does this manage to work? There are two factors:

  1. If the called suspendable function doesn't actually suspend, it will simply return its result, and this will become the result of your invokeSuspend function.
  2. If it does suspend, when it resumes, the suspendable function will on its own use the continuation you passed in and invoke its resume method with the result.

If it decides to suspend, the suspendable function immediately returns the special COROUTINE_SUSPENDED constant. suspendCoroutineUninterceptedOrReturn interprets this value as necessary to cooperate with the coroutine suspension mechanism. Specifically, it makes your function return the same constant to its caller (it does this whether or not your code actually returns the result of the suspendable function). This way the constant propagates all the way up the call stack until it reaches the non-suspendable function that started the coroutine. This is typically an event loop, and now it will be able to go on processing the next event.


how do you bind the target object? method.invoke takes the target followed by vargargs for the method parameters, but callSuspend only takes varargs.

The answer to this is documented under KCallable.parameters:

/**
 * Parameters required to make a call to this callable.
 * If this callable requires a `this` instance or an extension receiver parameter,
 * they come first in the list in that order.
 */
public val parameters: List<KParameter>

So this is how simple it is to implement your invokeSuspend in terms of KCallable.callSuspend:

suspend fun Method.invokeSuspend(obj: Any, vararg args: Any?): Any? =
        kotlinFunction!!.callSuspend(obj, *args)
0
votes

Repository.kt

suspend fun getTestRepository(): X
suspend fun getTestWithParamRepository(a: String): X

Service.kt

fun getTestService(lambda: suspend () -> X) { //... }

Using

Suspend function 'getTestWithParamRepository' should be called only from a coroutine or another suspend function

getTestService(repository::getTestRepository }

// Surround with lambda
getTestService { repository.getTestWithParamRepository("") }

GL