39
votes

How does Kotlin implement coroutines internally?

Coroutines are said to be a "lighter version" of threads, and I understand that they use threads internally to execute coroutines.

What happens when I start a coroutine using any of the builder functions?

This is my understanding of running this code:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. Kotlin has a pre-defined ThreadPool at the beginning.
  2. At (A), Kotlin starts executing the coroutine in the next available free thread (Say Thread01).
  3. At (B), Kotlin stops executing the current thread, and starts the suspending function loadData() in the next available free thread (Thread02).
  4. When (B) returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03).
  5. (C) executes on Thread03.
  6. At (D), the Thread03 is stopped.
  7. After 1000ms, (E) is executed on the next free thread, say Thread01.

Am I understanding this correctly? Or are coroutines implemented in a different way?


Update on 2021: Here's an excellent article by Manuel Vivo that complements all the answers below.

2
I am trying to build a diagram (or an animation) of how things work internally, something that looks like this --- en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svgVishnu Haridas

2 Answers

48
votes

Coroutines are a completely separate thing from any scheduling policy that you describe. A coroutine is basically a call chain of suspend funs. Suspension is totally under your control: you just have to call suspendCoroutine. You'll get a callback object so you can call its resume method and get back to where you suspended.

Here's some code where you can see that suspension is a very direct and trasparent mechanism, fully under your control:

import kotlin.coroutines.*
import kotlinx.coroutines.*

var continuation: Continuation<String>? = null

fun main(args: Array<String>) {
    val job = GlobalScope.launch(Dispatchers.Unconfined) {
        while (true) {
            println(suspendHere())
        }
    }
    continuation!!.resume("Resumed first time")
    continuation!!.resume("Resumed second time")
}

suspend fun suspendHere() = suspendCancellableCoroutine<String> {
    continuation = it
}

All the code above executes on the same, main thread. There is no multithreading at all going on.

The coroutine you launch suspends itself each time it calls suspendHere(). It writes the continuation callback to the continuation property, and then you explicitly use that continuation to resume the coroutine.

The code uses the Unconfined coroutine dispatcher which does no dispatching to threads at all, it just runs the coroutine code right there where you invoke continuation.resume().


With that in mind, let's revisit your diagram:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. Kotlin has a pre-defined ThreadPool at the beginning.

It may or may not have a thread pool. A UI dispatcher works with a single thread.

The prerequisite for a thread to be the target of a coroutine dispatcher is that there is a concurrent queue associated with it and the thread runs a top-level loop that takes Runnable objects from this queue and executes them. A coroutine dispatcher simply puts the continuation on that queue.

  1. At (A), Kotlin starts executing the coroutine in the next available free thread (Say Thread01).

It can also be the same thread where you called launch.

  1. At (B), Kotlin stops executing the current thread, and starts the suspending function loadData() in the next available free thread (Thread02).

Kotlin has no need to stop any threads in order to suspend a coroutine. In fact, the main point of coroutines is that threads don't get started or stopped. The thread's top-level loop will go on and pick another runnable to run.

Furthermore, the mere fact that you're calling a suspend fun has no significance. The coroutine will only suspend itself when it explicitly calls suspendCoroutine. The function may also simply return without suspension.

But let's assume it did call suspendCoroutine. In that case the coroutine is no longer running on any thread. It is suspended and can't continue until some code, somewhere, calls continuation.resume(). That code could be running on any thread, any time in the future.

  1. When (B) returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03).

B doesn't "return after execution", the coroutine resumes while still inside its body. It may suspend and resume any number of times before returning.

  1. (C) executes on Thread03.
  2. At (D), the Thread03 is stopped.
  3. After 1000ms, (E) is executed on the next free thread, say Thread01.

Again, no threads are being stopped. The coroutine gets suspended and a mechanism, usually specific to the dispatcher, is used to schedule its resumption after 1000 ms. At that point it will be added to the run queue associated with the dispatcher.


For specificity, let's see some examples of what kind of code it takes to dispatch a coroutine.

Swing UI dispatcher:

EventQueue.invokeLater { continuation.resume(value) }

Android UI dispatcher:

mainHandler.post { continuation.resume(value) }

ExecutorService dispatcher:

executor.submit { continuation.resume(value) } 
17
votes

Coroutines work by creating a switch over possible resume points:

class MyClass$Coroutine extends CoroutineImpl {
    public Object doResume(Object o, Throwable t) {
        switch(super.state) {
        default:
                throw new IllegalStateException("call to \"resume\" before \"invoke\" with coroutine");
        case 0:  {
             // code before first suspension
             state = 1; // or something else depending on your branching
             break;
        }
        case 1: {
            ...
        }
        }
        return null;
    }
}

The resulting code executing this coroutine is then creating that instance and calls the doResume() function everytime it needs to resume execution, how that is handled depends on the scheduler used for execution.

Here is an example compilation for a simple coroutine:

launch {
    println("Before")
    delay(1000)
    println("After")
}

Which compiles to this bytecode

private kotlinx.coroutines.experimental.CoroutineScope p$;

public final java.lang.Object doResume(java.lang.Object, java.lang.Throwable);
Code:
   0: invokestatic  #18                 // Method kotlin/coroutines/experimental/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;
   3: astore        5
   5: aload_0
   6: getfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
   9: tableswitch   { // 0 to 1
                 0: 32
                 1: 77
           default: 102
      }
  32: aload_2
  33: dup
  34: ifnull        38
  37: athrow
  38: pop
  39: aload_0
  40: getfield      #24                 // Field p$:Lkotlinx/coroutines/experimental/CoroutineScope;
  43: astore_3
  44: ldc           #26                 // String Before
  46: astore        4
  48: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  51: aload         4
  53: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  56: sipush        1000
  59: aload_0
  60: aload_0
  61: iconst_1
  62: putfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
  65: invokestatic  #44                 // Method kotlinx/coroutines/experimental/DelayKt.delay:(ILkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
  68: dup
  69: aload         5
  71: if_acmpne     85
  74: aload         5
  76: areturn
  77: aload_2
  78: dup
  79: ifnull        83
  82: athrow
  83: pop
  84: aload_1
  85: pop
  86: ldc           #46                 // String After
  88: astore        4
  90: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  93: aload         4
  95: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  98: getstatic     #52                 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;
 101: areturn
 102: new           #54                 // class java/lang/IllegalStateException
 105: dup
 106: ldc           #56                 // String call to \'resume\' before \'invoke\' with coroutine
 108: invokespecial #60                 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V
 111: athrow

I compiled this with kotlinc 1.2.41

From 32 to 76 is the code for printing Before and calling delay(1000) which suspends.

From 77 to 101 is the code for printing After.

From 102 to 111 is error handling for illegal resume states, as denoted by the default label in the switch table.

So as a summary, the coroutines in kotlin are simply state-machines that are controlled by some scheduler.