Kotlin suspend
functions should be nonblocking by convention (1). Often we have old Java code which relies on java Thread interruption mechanism, which we cannot (don't want to) modif (2):
public void doSomething(String arg) {
for (int i = 0; i < 100_000; i++) {
heavyCrunch(arg, i);
if (Thread.interrupted()) {
// We've been interrupted: no more crunching.
return;
}
}
}
What is the best way to adapt this code for usage in coroutines?
Version A: is unacceptable because it will run the code on the caller thread. So it will violate the "suspending functions do not block the caller thread" convention:
suspend fun doSomething(param: String) = delegate.performBlockingCode(param)
Version B: is better because it would run the blocking function in background thread, thus it wouldn't block the callers thread (except if by chance the caller uses the same thread from Dispatchers.Default threads pool). But coroutines job cancelling wouldn't interrupt performBlockingCode() which relies on thread interruption.
suspend fun doSomething(param: String) = withContext(Dispatchers.Default) {
delegate.performBlockingCode(param)
}
Version C: is currently the only way which I see to make it working. The idea is to convert blocking function into nonblocking with Java mechanisms and later use suspendCancellableCoroutine
(3) for converting asynchronous method into suspend function:
private ExecutorService executor = Executors.newSingleThreadExecutor();
public Future doSomethingAsync(String arg) {
return executor.submit(() -> {
doSomething(arg);
});
}
suspend fun doSomething(param: String) = suspendCancellableCoroutine<Any> { cont ->
try {
val future = delegate.doSomethingAsync(param)
} catch (e: InterruptedException) {
throw CancellationException()
}
cont.invokeOnCancellation { future.cancel(true) }
}
As commented below, above code won't work properly, because continuation.resumeWith() is not called
Version D: uses CompletableFuture: which provides a way to register callback for when completable completes: thenAccept
private ExecutorService executor = Executors.newSingleThreadExecutor();
public CompletableFuture doSomethingAsync(String arg) {
return CompletableFuture.runAsync(() -> doSomething(arg), executor);
}
suspend fun doSomething(param: String) = suspendCancellableCoroutine<Any> { cont ->
try {
val completableFuture = delegate.doSomethingAsync(param)
completableFuture.thenAccept { cont.resumeWith(Result.success(it)) }
cont.invokeOnCancellation { completableFuture.cancel(true) }
} catch (e: InterruptedException) {
throw CancellationException()
}
}
Do you know any better way for that?
Future
is the blockingget()
orjoin()
. You need aCompletableFuture
:supplyAsync(fn, executor)
. – Marko TopolnikCompletableFuture.await()
. Alas, it is hardcoded to callfuture.cancel(false)
. But I think you could simply write your own extension differently. Writing an extension onCompletableFuture
is more composable with other code, could relieve you from writing a suspendable wrapper every time. Keep in mind that interrupting coroutine-carrying threads is in general a dangerous practice and could cause the interrupt signal to be received by an unintended recipient. – Marko TopolnikCompletableFuture
I'd go with that. – Marko Topolnik