Kotlin Coroutines Under the Hood

Most likely, you were asked at a job interview “how do coroutines work under the hood?”, you without thinking twice threw out something like “there is a state machine under the hood, it determines which suspend function will be executed”, but did you really understand everything that was being said? Perhaps only you know this, but to be honest, I understood my own answers to such questions very poorly, as paradoxical as it may sound, and even after a dozen interviews I did not have a complete picture of how the inside of this truly incredible library of “sweet asynchronousness” works.

Okay, the introduction turned out to be too drawn out, as it happens in unprofessional films from which you expect action for half the running time, and then you get dull fights with glow sticks, let's go and figure it out!

Introducing the Cool Guys: CoroutineContext and CoroutineScope

Let's start with a simple example:

fun main() = runBlocking {
    // запускаем новую корутину
    launch {
        println("Hello, I'm a Kotlin coroutine, how are you?")
    }
}

For now we are only interested in the function launchwhich is most often used to create coroutines, let's dive into its source code:

// (1) launch является Kotlin Extension функцией для CoroutineScope
fun CoroutineScope.launch(
    // (2) контекст похож на HashMap'у, также хранит всякие штуки по ключу
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // (3) при создании новой корутины контекст может быть изменён
    val newContext = newCoroutineContext(context)
    // к остальной части кода вернёмся позже
}

Let's go through in more detail the points that I highlighted in the comments to the code:

1) To create a new coroutine you need to call launch within CoroutineScopeno other way, it was done so that the coroutine could get a context and pass it to child coroutines, if you look at the source CoroutineScopethen everything will become obvious:

interface CoroutineScope {
    val coroutineContext: CoroutineContext
}

When a coroutine is started, the current context is taken from CoroutineScope in which he was summoned launch.

2) CoroutineContext is not a hash table, but is implemented based on Pattern Composer:

/*
основной прикол паттерна Компоновщик: создать дерево 
из вложенных друг в друга объектов

чтобы реализовать такой паттерн нужен общий родитель, 
которым в данном случае является CoroutineContext 
*/


/*
обычные элементы контекста, такие как Job, CoroutineName и тд являются 
простыми объектами, которые не содержат другие или в терминах паттерна 
листовыми узлами 
*/
interface Element : CoroutineContext {
    val key: Key<*>

    /*
    для удобного доступа к элементам контекста переопределён оператор get
    
    это позволяет делать такие штуки: 
    coroutineContext[Job] вместо coroutineContext.get(Job)
    */
    override operator fun <E : Element> get(key: Key<E>): E? =
        if (this.key == key) this as E else null
}

/*
помимо листовых узлов есть комплексные наборы данных, 
которые могут в себя включать другие такие наборы 
и простые объекты
*/
class CombinedContext(
    val left: CoroutineContext, 
    val element: CoroutineContext.Element
) : CoroutineContext {

    override fun <E : Element> get(key: Key<E>): E? {
        /*
        логика простая: проверяем сначала простой объект,
        если ключи не совпадают, смотрим left, если он является 
        комплексным узлом CombinedContext, рекурсивно повторяем
        */
        var currentContext = this
        while (true) {
            currentContext.element[key]?.let { return it }
            val next = currentContext.left
            if (next is CombinedContext) {
                currentContext = next
            } else {
                return currentContext.get(key)
            }
        }
    }
  
}

class CoroutineName(val name: String) : CoroutineContext.Element {
    /*
    в качестве ключей для обычных элементов CoroutineContext 
    используются названия самих классов, что очень удобно

    поле key требует наследника Key<*> который определён ниже 
    через companion object, это работает даже если на первый взгляд 
    выглядит сомнительно и неоднозначно
    */
    override val key: Key<*> = CoroutineName 

    companion object Key : Key<CoroutineName>
}

fun main() {
    // Job и CoroutineDispatcher являются элементами CoroutineContext
    val combinedContext = CombinedContext(
        CombinedContext(
            Job(),
            Dispatchers.Default
        ), 
        CoroutineName("My name's Kotlin coroutine")
    )

    /*
    в итоге мы можем положить в CoroutineContext то что нужно корутинам:
    Job, CoroutineName, CoroutineDispatcher, CoroutineExceptionHandler и тд,
    а затем прокидывать контекст через CoroutineScope в сами корутины
    */
    val job = combinedContext[Job]
}

3) When creating a new coroutine, you can change CoroutineContext:

launch(CoroutineName("I'm a parent coroutine")) {
    launch(CoroutineName("I'm  child coroutine"))) {
        // ...
    }
}

The logic of operation in this case is as follows: the coroutine receives the current context from CoroutineScope and adds it to the context passed as a function parameter launchso the child coroutine from the example contains a different name.

Important point: not all elements CoroutineContext can be correctly changed, for example, when specifying a different one for the child coroutine Job you can destroy the principle Structured Concurrencywhich consists of cascading cancellation of all coroutines.

Let's sum it up:

  • coroutine is created via Kotlin Extension function CoroutineScope.launch

  • CoroutineScope is a simple interface that provides a context to a coroutine

  • CoroutineContext needed to store all sorts of useful things when running a coroutine: CoroutineName, CoroutineDispatcher, Job, CoroutineExceptionHandler etc

  • when creating a coroutine, you can pass a new context, this will result in the creation of a context based on the current one with modified elements taken from the new one

Continuation interface and implementation of suspend block

Let's return again to the initial example:

fun main() = runBlocking {
    launch {
        println("Hello, I'm a Kotlin coroutine, how are you?")
    }
}

public fun CoroutineScope.launch(
    // ...
    block: suspend CoroutineScope.() -> Unit
): Job {}

Pay attention to the keyword suspendprecisely because launch executes the lambda marked with this keyword we can run suspend functions within a coroutine, what's even more interesting is that this lambda turns into something interesting during compilation:

BuildersKt.launch$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
    int label = 0;

    public final Object invokeSuspend(Object var1) {
        Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure(var1);
                String var2 = "Some";
                System.out.println(var2);
                return Unit.INSTANCE;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
    }

    // этот метод будет использоваться для создания Continuation объекта
    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Function2 var3 = new <anonymous constructor>(completion);
        return var3;
    }

}), 3, (Object)null);

Aha, here it is, the very same state machine (switch block) that they casually mention in interviews; now it doesn’t do anything complicated except output text to the console and return an empty result.

After quite a few long evenings of digging into the source code and debugging, I finally figured out that the code generated above is nothing more than an implementation of an abstract class. ContinuationImplone of the heirs Continuation:

/*
само название говорит за себя, что это "продолжение" после приостановки 
корутины, да та самая магическая приостановка или SUSPENDED состояние о
которой говорят постоянно, позже узнаем что никакой магии тут нет
*/
public interface Continuation<in T> {
    /*
    как мы выяснили в предыдущем разделе CoroutineContext содержит
    важные штуки для корутин, например CoroutineDispatcher, который
    может пригодиться для переключения потоков
    */
    public val context: CoroutineContext

    /* 
    вызов этого метода происходит после возвращения корутины из состояния
    приостановки, именно сюда кладутся результаты suspend функций, также
    дочерние корутины могут вызывать этот метод у родительских для
    продолжения работы последних
    */
    public fun resumeWith(result: Result<T>)
}

Without such a thing as Continuation coroutines could not return to where the suspension occurred and therefore we could not execute code on different threads using sequential code writing, let me remind you that one of the key ideas is precisely the execution of asynchronous code as sequential, a small example:

// создаём корутину на главном потоке
launch {
    // вызываем функцию fetchAndroidUnderTheHoodPosts() в background потоке
    val myPosts = fetchAndroidUnderTheHoodPosts()
    /* 
    чтобы следующий код получил результат нужно как минимум
    каким-то образом получить его и не забыть переключиться на главный поток,
    а так как fetchAndroidUnderTheHoodPosts() выполняется на другом потоке 
    и неизвестно когда функция закончит своё выполнение, 
    остаётся только вариант передачи в функцию callback'а, 
    который будет вызван когда она завершится,
    таким callback'ом является Continuation объект
    */
    println(myPosts)
}

After the function fetchAndroidUnderTheHoodPosts will finish executing its code, the result will be passed through Continuation.resumeWith()which will lead to further execution of the coroutine, in the current example – outputting all posts to the console.

Okay, we've decided that without Continuation nothing will come of it and even found out that the suspend block is in launch functions are actually also Continuation object and inherits from ContinuationImplbut the main implementation is contained in BaseContinuationImpl class from which it inherits ContinuationImplis it difficult? No problem, get used to it, this is hardcore, not a cute cartoon about ponies.

Let's go see the implementation Continuation for suspend block:

/* 
компилятор генерирует реализацию этого 
абстрактного класса для suspend блоков
*/
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    // контекст берётся из корутины, дальше это увидим
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    private var intercepted: Continuation<Any?>? = null

    /*
    ContinuationInterceptor это штука которая оборачивает 
    текущий Continuation объект в новый, например DispatchedContinuation 
    и возвращает его, такой механизм используется для переключения потоков 
    через CoroutineDispatcher, кстати оборачивание одного объекта
    в другой с общим интерфейсом (Continuation) ничто иное как 
    паттерн Декоратор
    */
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

    // очистка обёрнутого Continuation объекта: 
    // зануление ненужных ссылок и тд
    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        // корутина закончила своё выполнение
        this.intercepted = CompletedContinuation
    }
}

/*
если в ContinuationImpl реализована поддержка CoroutineDispatcher'ов,
то в BaseContinuationImpl содержится основная логика работы с состоянием
приостановки
*/
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {и
    
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                /*
                текущая реализация Continuation требует в качестве completion
                более высокоуровневый Continuation объект, обычно им является 
                сама корутина
                напоминаю что мы сейчас рассматриваем реализацию 
                suspend блока в launch функции, а не саму корутину
                */
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        /*
                        вот тут происходит самое интересное, invokeSuspend 
                        выполняет внутри себя тот самый сгенерированный код
                        в котором содержатся наши suspend функции и если одна 
                        из них перешла в состояние приостановки, 
                        метод тупо делает return, Continuation в данном
                        случае не продолжит своё выполнение пока
                        не будет снова вызван resumeWith()
                        */
                        val outcome = invokeSuspend(param)
                        /*
                        COROUTINE_SUSPENDED - зарезервированная константа,
                        сигнализирующая о том, что внутри invokeSuspend 
                        произошла приостановка
                        */
                        if (outcome === COROUTINE_SUSPENDED) return
                        // если invokeSuspend вернул результат получаем его
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        // если произошло исключение тоже получаем его
                        Result.failure(exception)
                    }
                /*
                так как текущий BaseContinuationImpl получил результат, 
                значит suspend блок в корутине завершился, поэтому 
                текущий объект BaseContinuationImpl больше не нужен,
                а следовательно всякие используемые штуки, 
                такие как CoroutineDispatcher'ы например должен быть очищены
                */
                releaseIntercepted()

                /*
                если мы запустили корутину в другой корутине, то в качестве
                completion будет suspend блок родительской корутины:
                  
                launch { родительский suspend блок BaseContinuationImpl
                  launch { дочерний suspend блок BaseContinuationImpl
                    
                  }
                }

                */
                if (completion is BaseContinuationImpl) {
                    current = completion
                    param = outcome
                } else {
                    /*
                    вызывается самый высокоуровневый Continuation объект, 
                    в большинстве случаев это сама корутина
                    */
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    // тот самый метод, переопределённый в сгенерированном коде
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

To sum it up:

  1. we have Continuation an interface that allows a coroutine to continue execution after it has been suspended

  2. a special implementation is generated for the suspend block ContinuationImpl with a state machine (switch or when construct) in an overridden method invokeSuspend()

  3. When the suspend function is paused the following things happen:
    invokeSuspend() returns a special value COROUTINE_SUSPENDED
    BaseContinuationImpl terminates with return and awaits the next call resumeWith()

  4. The logic for handling the suspended state is contained in BaseContinuationImpland the logic of switching flows using CoroutineDispatcher'ov occurs in the heir ContinuationImpl.

What is coroutine?

Now we can figure out what a coroutine actually is and how the suspend block is executed in it. To do this, we return to the source code again. launch functions:

public fun CoroutineScope.launch(
    // ...
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    /*
    мы не будем рассматривать LazyStandaloneCoroutine, так как эта штука
    очень похожа на базовую реализацию корутины StandaloneCoroutine,
    только запускается по требованию
    */
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    /*
    запускаем корутину, LazyStandaloneCoroutine не запустится таким образом,
    нужно будет вручную вызвать метод start() у объекта Job, 
    который возвращает launch
    */
    coroutine.start(start, coroutine, block)
    return coroutine
}

Remember in BaseContinuationImpl there was such a parameter as completion, I also said that it could be the coroutine itself, so here it is StandaloneCoroutine and there is a coroutine implementation, in short, let's not drag it out, let's go look at the source code:

// корутина наследует Job, Continuation и CoroutineScope
class StandaloneCoroutine<in T>(
    parentContext: CoroutineContext,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    init {
        /*
        кладёт Job текущей корутины в Job'у родительской
        это нужно чтобы родительские корутины знали о 
        дочерних и не завершались раньше них
        */
        initParentJob(parentContext[Job])
    }

    /* 
    так можно сделать потому что StandaloneCoroutine 
    является реализацией Job, а Job является одним из элементов 
    CoroutineContext'а
    */
    override val context: CoroutineContext = parentContext + this

    // вы уже знаете, что когда корутина выходит из состояния приостановки,
    // вызывается метод Continuation.resumeWith()
    override fun resumeWith(result: Result<T>) {
        // makeCompletingOnce пытается завершить Job'у корутины
        val state = makeCompletingOnce(result.toState())
        // если у текущей Job'ы есть дочерние и они не были завершены, 
        // корутина не может быть завершена
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }

    /*
    запуск suspend блока в корутине происходит в CoroutineStart enum'е,
    
    в качестве параметра receiver передаётся сама корутина, это нужно
    чтобы suspend блок получил CoroutineContext, если уже забыли про это, 
    возвращайтесь к исходнику ContinuationImpl
    
    completion это как раз то самый высокоуровневый Continuation объект,
    вызываемый из сгенерированного Continuation объекта для suspend блока, 
    как я уже говорил ранее им является сама корутина
    */
    fun <R> start(
        start: CoroutineStart, 
        receiver: R, 
        block: suspend R.() -> T
    ) {
        start(block = block, receiver = receiver, competion = this)
    }
}

// поначалу я долго искал где запускается корутина, так как не сразу
// заметил что у CoroutineStart переопределён invoke оператор
enum class CoroutineStart {

    // я опустил остальные варианты, оставил только базовый
    DEFAULT;

    operator fun <R, T> invoke(
        block: suspend R.() -> T, 
        receiver: R, 
        completion: Continuation<T>
    ): Unit =
        when (this) {
            // для suspend блока будет сгенерированна ContinuationImpl
            // реализация, которая запустится при вызове launch функции
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            else -> Unit
        }

}

All that remains is to figure out how the suspend block is launched, for this we fall one level lower, in startCoroutineCancellable() function:

// обратите внимание, что startCoroutineCancellable() является
// Kotlin Extension функцией для suspend блока
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, 
    completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    // первым делом из suspend блока создаётся Continuation объект
    createCoroutineUnintercepted(receiver, completion)
        /*
        далее оборачивается в DispatchedContinuation, если используется
        CoroutineDispatcher, а он в большинстве случаев используется
        */
        .intercepted()
        /*
        ну и происходит вызов Continuation.resumeWith() 
        в зависимости от типа Continuation, чтобы блок в корутине
        начал выполняться, иначе ничего не произойдёт
        */
        .resumeCancellableWith(Result.success(Unit), onCancellation)

// создаёт Continuation из suspend блока
actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    /* 
    в нашем случае suspend блок является объектом ContinuationImpl,
    а это наследник BaseContinuationImpl, поэтому выполняется 
    первая ветка
    */
    return if (this is BaseContinuationImpl)
        // метод create будет сгенерирован компилятором для ContinuationImpl
        // реализации, кстати ранее уже был пример сгенерированного кода
        create(receiver, completion)
    else
        ...
}

/*
делает вызов resumeWith в зависимости от типа Continuation:
1) DispatchedContinuation может передать вызов resumeWith() 
CoroutineDispatcher'у, который выполнит код на другом потоке
2) обычный вызов Continuation.resumeWith() произойдёт без
смены потоков и тд
*/
fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

We can say that you have completed Jedi training and are ready for a real fight, let's look at the very first example:

fun main() = runBlocking {
    // запускаем новую корутину
    launch {
        println("Hello, I'm a Kotlin coroutine, how are you?")
    }
}

If we look at the example from the point of view of the internals of coroutines, we get approximately the following code:

fun main() {
    SuspendLaunchBlock(
        // (2)
        completion = StandaloneCoroutine()
    ).resumeWith(Result.success(Unit)) // (3)
}

// (1)
class SuspendLaunchBlock(
    completion: StandaloneCoroutine<Any?>
) : ContinuationImpl(completion) {

    var label = 0

    // (5)
    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (newResult === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        completion.resumeWith(newResult)
    }

    // (4)
    fun invokeSuspend(result: Result<Any?>): Any? {
        when(label) {
            0 -> {
                throwIfFailureResult(result)
                println("Hello, I'm a Kotlin coroutine, how are you?")
                return Unit
            }
            else -> error("Illegal state")
        }
    }
  
}

class StandaloneCoroutine<T>(...) : Continuation<T> {

    // (6)
    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
  
}

Let's take it in order:

  1. Generated implementation is created SuspendLaunchBlock for suspend block in launch functions with a state machine or, more specifically, a when construct.

  2. It is being created StandaloneCoroutine and is passed as a completion parameter in SuspendLaunchBlock

  3. The coroutine is launched via a call SuspendLaunchBlock.resumeWith() method, which then executes the generated invokeSuspend() method

  4. IN invokeSuspend() the only branch in the when block is executed – output to the console and return an empty result

  5. After finishing invokeSuspend() V SuspendLaunchBlock.resumeWith() First, a check is made for the suspended state, in this case invokeSuspend() executed without pausing, so it is called immediately completion.resumeWith()and since completion is StandaloneCoroutinethen it is called StandaloneCoroutine.resumeWith() implementation

  6. StandaloneCoroutine.resumeWith() checks if there are any unfinished child coroutines, we don't have any, and stops execution

You are practically a Jedi Master! Take a break, make some tea or coffee, eat some chocolate and go through the example again, it is very important to understand that the mechanism of coroutines consists of transitions between suspension states through a call Continuation.resumeWith() method, and completion occurs at the root Continuation object and most importantly, there is no magic.

What if there is a chain of suspend functions in a coroutine?

Let's complicate the example from the previous section by adding two suspend functions:

// опустим подробности реализации
suspend fun fetchAuthToken() = ...
suspend fun fetchProfileInfo(token: String) = ...

fun main() = runBlocking {
    // запускаем корутины с двумя suspend функциями
    launch {
        val token = fetchAuthToken()
        val profile = fetchProfileInfo(token = token)
        println(profile)
    }
}

I wonder what the code will be now from the point of view of the internals of coroutines, let's look:

fun main() {
    SuspendLaunchBlock(
        // (2)
        completion = StandaloneCoroutine()
    ).resumeWith(Result.success(Unit)) // (3)
}

// (5)
suspend fun fetchAuthToken(continuation: SuspendLaunchBlock): Any? {
    /*
    любое асинхронное выполнение кода приводит к состоянию приостановки
    это необязательно использование многопоточности, дальше вы это увидите
    
    runCodeInBackground - магический метод, который выполняет 
    блок кода в фоновом потоке
    */
    runCodeInBackground {
        val token = ...
        runCodeInMain {
            // (6)
            // магический метод, который выполняет блок кода на главном потоке
            continuation.resumeWith(token)
        }
    }
  
    return COROUTINE_SUSPENDED
}

suspend fun fetchProfileInfo(token: String) = ...

// (1)
class SuspendLaunchBlock(
    completion: StandaloneCoroutine<Any?>
) : ContinuationImpl(completion) {

    var label = 0

    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        } 
        // (9)
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        // while(true) нужен чтобы выполнять ветки дальше, 
        // если suspend функция не перешла в состояние приостановки
        while (true) {
            when(label) {
                // (4)
                0 -> {
                    throwIfFailureResult(result)
                    label = 1
                    // Continuation передаётся в качестве 
                    // аргумента suspend функции
                    val state = fetchAuthToken(this)
                    if (state == COROUTINE_SUSPENDED) {
                        return COROUTINE_SUSPENDED
                    }
                }
                // (7)
                1 -> {
                    throwIfFailureResult(result)
                    label = 2
                    val token = result.unwrap()
                    val state = fetchProfileData(token, this)
                    if (state == COROUTINE_SUSPENDED) {
                        return COROUTINE_SUSPENDED
                    }
                }
                // (8)
                2 -> {
                    throwIfFailureResult(result)
                    val profile = result.unwrap()
                    println(profile)
                    break
                }
                else -> error("Illegal state")
            }
        }
        
        return Unit
    }
  
}

class StandaloneCoroutine<T>(...) : Continuation<T> {

    // (10)
    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
  
}

To make it more interesting, let's assume that both suspend functions go into the suspended state, let's look at the logic:

  1. Generated implementation is created SuspendLaunchBlock for suspend block in launch functions with a state machine or, more specifically, a when construct.

  2. It is being created StandaloneCoroutine and is passed as a completion parameter in SuspendLaunchBlock

  3. The coroutine is launched via a call SuspendLaunchBlock.resumeWith() method, which then executes the generated invokeSuspend() method

  4. IN invokeSuspend() the first branch is executed (label == 0), where the function call occurs fetchAuthToken()the current one is passed as the only parameter Continuation object, in this case it is SuspendLaunchBlockthe value of the variable label changes to 1

  5. Function fetchAuthToken() returns value COROUTINE_SUSPENDEDwhich indicates a suspended state, it is important that there is no magic here, the code execution occurs in another thread, and since this is asynchronous execution, the external code can only pass the result through a callback, which by the way is SuspendLaunchBlock

  6. After executing your code fetchAuthToken() calls the method SuspendLaunchBlock.resumeWith() with the result of its work, in the example it is a string with a token

  7. SuspendLaunchBlock.resumeWith() resumes its execution and calls invokeSuspend() again, where the second branch is already executed (label == 1), in it the fetchProfileData() method is called, as the first parameter it receives the token from the previous suspend function fetchAuthToken()and as the second, a link to the Continuation object, which, as we already know, is SuspendLaunchBlockmethod fetchProfileData() is performed similarly fetchAuthToken()label becomes equal to 2

  8. In the last branch invokeSuspend()where label == 2, the function result is output to the console fetchProfileData() and return an empty value

  9. This time the returned value is from invokeSuspend() is not COROUTINE_SUSPENDED therefore the execution SuspendLaunchBlock ends, further control is transferred StandaloneCoroutine via call completion.resumeWith()

  10. StandaloneCoroutine.resumeWith() checks if there are any unfinished child coroutines, we don't have any, and stops execution

Great, now you know how transitions between individual suspend functions occur, congratulations, you can safely show off your smarts at interviews, but don't forget that modesty makes a man look good)

Thread switching, delay() and CoroutineDispatcher

We forgot about the most important thing for which coroutines are used in principle – execution of suspend functions on other threads, hence the need to suspend execution of the current coroutine, as we have already found out, for this you need to pass Continuation object suspend function and wait until it calls itself Continuation.resumeWith() method:

fun fetchAuthToken(continuation: Continuation<Any?>): Any? {
    // магический метод, который выполняет блок кода в фоновом потоке
    runCodeInBackground {
        val token = ...
        // магический метод, который выполняет блок кода на главном потоке
        runCodeInMain {
            /* 
            чтобы сообщить корутине что suspend функция завершила
            своё выполнение нужно вызвать Continuation.resumeWith() 
            с результатом работы функции
            */
            continuation.resumeWith(token)
        }
    }
    // так как функция не может сразу вернуть результат, 
    // она переходит в состояние приостановки
    return COROUTINE_SUSPENDED
}

This is a fairly simplified version of the code, but it does reflect the general mechanism, and most importantly, it shows that the suspended state is nothing more than executing some code on another thread and returning the result through Continuation.resumeWith as through a regular callback.

In the previous section I briefly mentioned that any asynchronous execution leads to a suspended state, but I didn't cover this topic, let's figure it out.

What is needed to make the code asynchronous? The correct way is to make its execution independent of the current execution point, for example, to create a new thread:

fun main() {
    // создаётся новый поток и сразу запускается
    Thread {
        val sum = 3 + 7
        println(sum)
    }.start()
    // код main() продолжает выполняться независимо от того,
    // выполнился ли весь код в Thread
    val mul = 3 * 7
    println(mul)
    // функция main() может завершиться раньше созданного потока
}

But let's forget about multithreading for a minute and remember the main thread of Android, will it allow us to execute code asynchronously? Of course yes, you can do it through the good old Handler:

// Handler поставит выполнение кода в очередь главного потока
handler.post {
    println("I'll run soon")
}

By the way Handler used in the implementation of the function delay for the main thread of Android, if you forgot, this function allows you to make a delay without blocking the current thread:

fun delay(continuation: Continuation<Any?>): Any? {
    val block = Runnable {
        // после того как задержка пройдёт, выполнится этот блок кода
        // и корутина продолжит своё выполнение после приостановки
        continuation.resumeWith(Result.success(Unit))
    }
 
    // handler.postDelayed() выполняет block через указанный 
    // промежуток времени в миллисекундах
    if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
        // если корутина была отменена нужно отменить задержку
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    } else {
        // отменяет текущую корутину, так как Handler для главного потока
        // был закрыт
        cancelOnRejection(continuation.context, block)
    }

    return COROUTINE_SUSPENDED
}

That's the whole point of asynchronous code execution, it can be any mechanism that allows you to execute X code independently of Y code, and if you try to define coroutines now, it will be something like this:

A coroutine is an abstraction that wraps some asynchronous work, whether it's running code in another thread or using a main thread queue like Android's MessageQueue, into nice sequential code with cancellation mechanisms and other cool stuff.

Okay, it seems like we figured out what suspending a coroutine and asynchronous code execution are, we can move on to more practical things, for example, to the function withContext()most often used to change CoroutineDispatcher'A:

suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    /*
    как мы знаем Continuation работает под капотом корутин и его нельзя
    получить явно в прикладном коде, поэтому была придумана inline функция
    suspendCoroutineUninterceptedOrReturn (и не одна кстати), 
    которая после компиляции подставит текущий Continuation объект
    */
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        val oldContext = uCont.context
        /*
        если вы ещё не забыли то новый контекст производится 
        путём сложения двух контекстов, в качестве результата 
        мы имеем контекст в котором старые элементы заменены новыми, 
        например Dispatchers.Main можно поменять на Dispatchers.Default
        */
        val newContext = oldContext.newCoroutineContext(context)

        // проверка что корутина все ещё выполняется
        newContext.ensureActive()

        // мы не будем рассматривать все ветки, нас интересует только
        // переключение потоков через CoroutineDispatcher
        if (newContext === oldContext) {
            ...
        }
        // CoroutineDispatcher является наследником ContinuationInterceptor
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            ...
        }
        
        // ну вот опять создаётся какая-то неизвестная нам корутина,
        // не беспокойтесь там достаточно простая логика
        val coroutine = DispatchedCoroutine(newContext, uCont)
        // стартуем также как и обычную корутину
        block.startCoroutineCancellable(coroutine, coroutine)
        /*
        если есть возможность сразу отдать результат без приостановки корутины
        то withContext сразу завершится, в противном случае корутина, 
        содержащая withContext() вызов перейдёт в состояние приостановки
        */
        coroutine.getResult()
    }
}

Function withContext does 3 simple things:

  1. Gets the current Continuation object, it can be taken from the suspend parameter of the function or from the current coroutine in which the function was called withContext

  2. Takes context from Continuation object and adds it to the context passed as a parameter, resulting in a new context being created.

  3. Based on the new context, creates a certain type of coroutine, for example if it was changed CoroutineDispatcher a coroutine will be created DispatchedCoroutine

Well, let's look at the source code now. DispatchedCoroutine:

/*
чаще всего в качестве continuation параметра выступает 
Continuation объект, который генерируется для suspend блока в корутине, 
если забыли, то это реализация абстрактного класса ContinuationImpl
*/
internal class DispatchedCoroutine<in T> internal constructor(
    context: CoroutineContext,
    continuation: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {

    /* 
    метод afterResume() вызывается перед завершением Continuation.resumeWith(),
    когда корутина закончила выполнять все свои suspend функции и 
    у неё больше нет дочерних корутин в состоянии выполнения
    */
    override fun afterResume(state: Any?) {
        /*
        метод afterResume() может быть вызван раньше getResult(), 
        например если блок кода в withContext() очень быстро выполнился
        в таком случае результат вернёт getResult()
        */
        if (tryResume()) return 
        /*
        Я уже вскользь упоминал что делает каждый метод в этой цепочке,
        когда мы рассматривали как стартует корутины, ещё раз повторим:
  
        intercepted() оборачивает continuation в DispatchedContinuation, 
        который реализует логику работы с CoroutineDispatcher'ами

        resumeCancellableWith() вызывает resumeWith() в зависимости от типа
        Continuation, в данном случае будет вызван метод
        DispatchedContinuation.resumeCancellableWith()
        */  
        continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }

    internal fun getResult(): Any? {
        // если нельзя сразу вернуть результат корутина приостанавливается
        if (trySuspend()) return COROUTINE_SUSPENDED

        // результат выполнения withContext()
        val state = ...
        return state as T
    }
}

To sum it up, DispatchedCoroutine performs two key tasks:

  1. Adds the ability to return the result without going into the suspended state, if such an option exists, for this purpose it is used getResult() method.

  2. Switches Continuation the object in which it was called withContext()on the native thread, for example if your coroutine is running on the main thread, then calls withContext() in the background, then the result should return to the main one again.

Okay, okay. DispatchedCoroutine more or less figured it out, to understand how the switching of threads in coroutines actually happens, let's fall into DispatchedContinuationinto which others turn Continuation objects:

/*
DispatchedContinuation принимает на вход:

dispatcher - выполняет блок кода, чаще всего на другом потоке 
или с использованием очереди, например Handler / MessageQueue из Android 
continuation - Continuation объект, работа с которым будет происходить
через указанный диспатчер
*/
internal class DispatchedContinuation<in T>(
    val dispatcher: CoroutineDispatcher,
    val continuation: Continuation<T>
) : Continuation<T> by continuation {

    /*
    логика resumeWith() идентична resumeCancellableWith() с отличием только
    в разных режимах resumeMode, обычно чаще всего вызывается 
    именно resumeCancellableWith, так как режим MODE_CANCELLABLE 
    позволяет прокинуть CancellationException для отмены корутины
    */
    override fun resumeWith(result: Result<T>) { ... }

    internal inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        /*
        CoroutineDispatcher имеет два логически связанных метода:
        
        isDispatchNeeded() решает выполнять код в диспатчере или нет
        dispatch() выполняет код в диспатчере: код может выполниться на
        другом потоке, поставлен в очередь и тд
        */
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            // для наглядности я упростил блок кода и написал его здесь
            val block = Runnable {
                continuation.resumeWith(state)
            }
            dispatcher.dispatch(context, block)
        } else {
            /*
            если диспатчер не хочет выполнять код, а такое может быть
            например если диспатчер переключает на главный поток, а мы уже
            на главном потоке и внутри диспатчера реализована проверка, 
            то isDispatchNeeded() вернёт false, в таком случае выполнение
            корутины будет добавлено в EventLoop
            */
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

}

As you can see, the logic CoroutineDispatcher'and quite simple:

  1. The method is called isDispatchNeeded() to understand whether to give the execution of the code to the dispatcher or not, this is necessary to avoid unnecessary calls dispatch()for example, not to switch to the main thread if we are already on it

  2. If isDispatchNeeded() returned true of course we give the execution of the code to the dispatcher by calling the method dispatch()

  3. If isDispatchNeeded() returned false start coroutine on EventLoop'e, more on that in the next section

As an example, let's consider such interesting dispatchers from Android as: Dispatchers.Main And Dispatchers.Main.immediate:

val mainLooper = Looper.getMainLooper()
val handler = Handler(mainLooper)

// реализация для Dispatchers.Main
override fun isDispatchNeeded(...) = true

// реализация для Dispatchers.Main.immediate
override fun isDispatchNeeded(...): Boolean {
    // сравнивает Looper текущего потока с главным
    return Looper.myLooper() != mainLooper
}

override fun dispatch(
    context: CoroutineContext, 
    block: Runnable
) {
    // handler.post() выполняет блок кода на главном потоке
    handler.post(block)
}

Here's the whole difference between them: Dispatchers.Main always switches code execution to the main thread via Handler.post()A Dispatchers.Main.immediate only if the code is not running on the main thread.

To consolidate our knowledge, let's try to put everything together and describe the logic for the following example:

// примерно такой код можно встретить в рабочих проектах 
viewModelScope.launch {
    // я не стал выносить в отдельную функцию, чтобы не усложнять пример
    val posts = withContext(Dispatchers.IO) {
        try {
            // получаем список постов в background потоке
            apiService.fetchPosts()
        } catch (exception: Exception) {
            // важно прокидывать CancellationException дальше 
            // так как это часть механизма отмены корутины
            if (exception is CancellationException) throw exception
            emptyList()
        }
    }
    // отображаем данные на главном потоке
    println(posts)
}

Under the hood, all this code will look something like this:

// (1)
class ViewModelScopeLaunchBlock(
    completion: Continuation<Any?>
) : ContinuationImpl(completion) {

    var label = 0

    // (5)
    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        // (17)
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        // while(true) нужен чтобы выполнять ветки дальше, 
        // если suspend функция не перешла в состояние приостановки
        while (true) {
            when(label) {
                // (5)
                0 -> {
                    throwIfFailureResult(result)
                    label = 1
                    // Continuation передаётся в качестве 
                    // аргумента suspend функции
                    val state = fetchPosts(this)
                    // (10)
                    if (state == COROUTINE_SUSPENDED) {
                        return COROUTINE_SUSPENDED
                    }
                }
                // (16)
                1 -> {
                    throwIfFailureResult(result)
                    val profile = result.unwrap()
                    println(profile)
                    break
                }
                else -> error("Illegal state")
            }
        }
        
        return Unit
    }
  
}

class StandaloneCoroutine(...) {

    // (18)
    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
  
}

// (6)
class WithContextBlock(
    completion: DispatchedCoroutine
) : ContinuationImpl(completion) {

    var label = 0

    // (12)
    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
             Result.failure(exception)
        }
        // (12)
        completion.resumeWith(newResult)
    }

    // (11)
    fun invokeSuspend(result: Result<Any?>): Any? {
        try {
            val posts = apiService.fetchPosts()
            return posts
        } catch (exception: Exception) {
            // важно прокидывать CancellationException дальше так как это часть
            // механизма отмены корутины, вспомните resumeCancellableWith
            if (exception is CancellationException) throw exception
            return emptyList()
        }
    }

}

class DispatchedCoroutine(
    ...
    // (7)
    val continuation: ViewModelScopeLaunchBlock
): ScopeCoroutine(context, continuation) {

    // (13)
    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }

    // (14)
    override fun afterResume(state: Any?) {
        if (tryResume()) return 
        continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
  
}

class DispatchedContinuation<in T>(
    val dispatcher: CoroutineDispatcher,
    val continuation: Continuation<T>
) : Continuation<T> by continuation {

    // (4, 9, 15)
    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            val block = Runnable {
                continuation.resumeWith(state)
            }
            // (9, 15)
            dispatcher.dispatch(context, block)
        } else {
            // (4)
            continuation.resumeWith(state)
        }
    }

}

// (2)
val topLevelCoroutine = StandaloneCoroutine(...)

val viewModelScopeLaunchBlock = ViewModelScopeLaunchBlock(
    // (2)
    completion = topLevelCoroutine
)

// (3)
DispatchedContinuation(
  dispatcher = Dispachers.Main.immediate
  continuation = viewModelScopeLaunchBlock
).resumeCancellableWith(Result.success(Unit))

val withContextBlock = WithContextBlock(
    // (7)
    completion = DispatchedCoroutine(viewModelScopeLaunchBlock)
)

// (8)
DispatchedContinuation(
    dispatcher = Dispatchers.IO,
    continuation = withContextBlock
).resumeCancellableWith(Result.success(Unit))

Very confusing? I agree, let's go in order:

  1. The implementation is generated ViewModelScopeLaunchBlock for suspend block in viewModelScope.launch() functions with a state machine or, more specifically, a when construct.

  2. It is being created StandaloneCoroutine and is passed as a completion parameter in ViewModelScopeLaunchBlock

  3. ViewModelScopeLaunchBlock turns into DispatchedContinuationthe dispatcher that was specified in is passed as viewModelScopefor Android this dispatcher will be Dispatchers.Main.immediate

  4. Called DispatchedContinuation.resumeCancellableWith() For ViewModelScopeLaunchBlockwhere the check for the main thread occurs, but since viewModelScope.launch() and since it is executed on the main thread, there will be no switching through the dispatcher, and the method ViewModelScopeLaunchBlock.resumeWith() will be called directly

  5. IN ViewModelScopeLaunchBlock.resumeWith() a call is in progress invokeSuspend()where the first branch (label == 0) starts executing, in which is called fetchPosts()the first parameter is a reference to the current Continuation object, in this case it is ViewModelScopeLaunchBlockthe variable label becomes equal to 1

  6. In function fetchPosts() is called withContext()which is also like viewModelScope.launch() accepts a suspend block, so an implementation is generated WithContextBlock

  7. It is being created DispatchedCoroutine and is passed as a completion parameter in WithContextBlock.
    note that DispatchedCoroutine as an object Continuation accepts ViewModelScopeLaunchBlockthe block from which the function is called withContext()because we need to somehow return the result back.

  8. WithContextBlock starts similarly ViewModelScopeLaunchBlockalso turns into DispatchedContinuationonly now it is transmitted as a dispatcher Dispatchers.IO.

  9. Called DispatchedContinuation.resumeCancellableWith() For WithContextBlockwhere the main thread switches to the background thread via Dispatchers.IO dispatcher, WithContextBlock.resumeWith() will now run on a background thread

  10. DispatchedCoroutine cannot return the query result immediately and therefore fetchPosts() V ViewModelScopeLaunchBlock.invokeSuspend() returns COROUTINE_SUSPENDEDwhich leads to the suspension of the coroutine

  11. Method WithContextBlock.invokeSuspend() executes a single line of code – a request to the network and receiving a response on a background thread.

  12. When the request is completed the method WithContextBlock.invokeSuspend() will return the result in WithContextBlock.resumeWith()where the further sending of the result will occur via the call completion.resumeWith() into the coroutine object, in this case it is DispatchedCoroutine

  13. IN DispatchedCoroutine.resumeWith() first, a check will be made for child coroutines in the running state, and if there are none, as in this example, the code will be executed DispatchedCoroutine.afterResume()

  14. Method DispatchedCoroutine.afterResume() should return the result in ViewModelScopeLaunchBlockbut there is a problem here: WithContextBlock is currently running on the background thread provided by Dispatchers.IOA ViewModelScopeLaunchBlock must get the result on the main one, so the chain is called continuation.intercepted().resumeCancellableWith()method intercepted() will not re-wrap ViewModelScopeLaunchBlock V DispatchedContinuationthis is done for optimization

  15. Called again DispatchedContinuation.resumeCancellableWith() For ViewModelScopeLaunchBlockbut only now switching to the main stream occurs through the dispatcher Dispatchers.Main.immediateif you haven't forgotten it's under the hood Handler.post() challenge, in the end ViewModelScopeLaunchBlock.resumeWith() is executed on the main thread, and the list of posts is passed as a result

  16. IN ViewModelScopeLaunchBlock.resumeWith() the second call occurs ViewModelScopeLaunchBlock.invokeSuspend()now the second branch (label == 1) is executed, which takes the list of posts and outputs it to the console.

  17. Method ViewModelScopeLaunchBlock.invokeSuspend() completes successfully without pausing, so ViewModelScopeLaunchBlock.resumeWith() finishes its execution and makes a call completion.resumeWith()where the completion is a coroutine StandaloneCoroutine

  18. IN StandaloneCoroutine.resumeWith() a check is made for child coroutines in the running state, there are none in this example, the coroutine is completed.

If you've gotten this far, you're clearly not a simpleton, so be sure to take a break from the chocolate bar and try to follow the path of coroutines in your code in a similar way.

Unfortunately, this is not the end, coroutines can be launched in other coroutines and not one by one, but in whole batches, a similar mechanism with dispatchers works here, but in addition to it there is also EventLoopif you have already rested and are ready to make the final push, let's continue!

Child coroutines, EventLoop and runBlocking

It is important to differentiate the execution of child coroutines into two types:

  1. The child coroutine is launched via the dispatcher

  2. The child coroutine is executed on EventLoop'e

The first case occurs when the dispatcher always switches the execution of a coroutine, even if it is running on the correct thread, e.g. Dispatchers.Mainthe second is typical for dispatchers, where switching occurs only when necessary, a striking example from Android: Dispatchers.Main.immediatewhich switches execution of the coroutine to the main thread only if it is not running on it.

Well, let's look at both cases in order, starting with the first:

/*
Dispatchers.Main всегда переключает выполнение корутины 
на главный поток через Handler.post() механизм, 
даже если корутина и так выполняется на главном
*/
val uiScope = CoroutineScope(Dispatchers.Main + Job())
uiScope.launch {
    launch {
        println("I'm child coroutine #1")
    }
    launch {
        println("I'm child coroutine #2")
    }
    println("I'm parent coroutine")
}

// примерно во что всё это превратится

class UiScopeParentBlock(
    completion: StandaloneCoroutine
) : ContinuationImpl(completion) {

    var label = 0

    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        when(label) {
            0 -> {
                throwIfFailureResult(result)
                println("I'm parent coroutine")
                return Unit
            }
            else -> error("Illegal state")
        }
    }
  
}

class UiScopeChild1Block(
    completion: UiScopeParentBlock
) : ContinuationImpl(completion) {

    var label = 0

    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        when(label) {
            0 -> {
                throwIfFailureResult(result)
                println("I'm child coroutine #1")
                return Unit
            }
            else -> error("Illegal state")
        }
    }
  
}

class UiScopeChild2Block(
    completion: UiScopeParentBlock
) : ContinuationImpl(completion) {

    var label = 0

    fun resumeWith(result: Result<Any?>) {
        try {
            val newResult = invokeSuspend(result)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(newResult)
        } catch (exception: Throwable) {
            Result.failure(exception)
        }
        completion.resumeWith(newResult)
    }

    fun invokeSuspend(result: Result<Any?>): Any? {
        when(label) {
            0 -> {
                throwIfFailureResult(result)
                println("I'm child coroutine #2")
                return Unit
            }
            else -> error("Illegal state")
        }
    }
  
}

class StandaloneCoroutine(...) {

    fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        /*
        resumeWith() не завершится полностью пока 
        дочерние корутины не закончат своё выполнение

        когда мы рассматривали код StandaloneCoroutine, то можно было
        увидеть как происходит добавление Job'ы дочерней корутины в Job'у
        родительской, поэтому родительская корутина знает состояния 
        своих дочерних корутин
        */
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
  
}

// родительская корутина
val parentCoroutine = StandaloneCoroutine(...)

val uiScopeParentBlock = UiScopeParentBlock(
    completion = parentCoroutine
)

DispatchedContinuation(
  dispatcher = Dispachers.Main
  continuation = uiScopeParentBlock
).resumeCancellableWith(Result.success(Unit))

// первая дочерняя корутина
val childCoroutine1 = StandaloneCoroutine(...)

val uiScopeChild1Block = UiScopeChild1Block(
    completion = childCoroutine1
)

DispatchedContinuation(
  dispatcher = Dispachers.Main
  continuation = uiScopeChild1Block
).resumeCancellableWith(Result.success(Unit))

// вторая дочерняя корутина
val childCoroutine2 = StandaloneCoroutine(...)

val uiScopeChild1Block = UiScopeChild1Block(
    completion = childCoroutine2
)

DispatchedContinuation(
  dispatcher = Dispachers.Main
  continuation = uiScopeChild1Block
).resumeCancellableWith(Result.success(Unit))

I will not describe individual steps, as it was in the previous section, you can easily handle this yourself, besides, it will be good practice for consolidating knowledge, in short, the main point:

  1. Parental coroutine StandaloneCoroutine will not complete until all of its child coroutines have completed.
    When a new coroutine is created, its object Job added as a child element to the object Job parent coroutine, thanks to which coroutines can track the states of their children.

  2. suspend child coroutine blocks UiScopeChild1Block And UiScopeChild2Block will be wrapped in DispatchedContinuation and switched to the main stream via Handler.post() regardless of whether the parent coroutine was initially on the main thread or not, Dispatchers.Main Unlike Dispatchers.Main.immediate always makes a switch.

  3. An object Continuation the parent coroutine is not connected in any way with Continuation objects of the child coroutines, so when the latter completes the result will not be passed back to UiScopeParentBlockand there is no particular sense in this, as with, for example, withContext()which guarantees a sequential order of execution with the return of the result.

  4. Dispatchers in principle cannot guarantee the order of execution, since they execute code asynchronously, the same method Handler.post() from Android does not give 100% confidence that the code will always be executed in the order we planned.

We've generally figured out how to launch child coroutines through dispatchers, but what happens if, for example, we have a dispatcher Dispatchers.Main.immediate and all coroutines are executed on the main thread, child coroutines will not be switched again through Handler.post() how it was with Dispatchers.Mainin this case the so-called starts to work EventLoop:

// метод из DispatchedContinuation
internal inline fun resumeCancellableWith(
    result: Result<T>,
    noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
    val state = result.toState(onCancellation)
    if (dispatcher.isDispatchNeeded(context)) {
        // ...
    } else {
        /*
        executeUnconfined делает одну из двух вещей:
        
        1) выполняет лямбду на EventLoop'е
        2) ставит лямбду в очередь EventLoop'а
        */
        executeUnconfined(state, MODE_CANCELLABLE) {
            continuation.resumeWith(result)
        }
    }
}

private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?, mode: Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean {
    // EventLoop - штука куда кладутся лямбды в очередь на исполнение
    val eventLoop = ThreadLocalEventLoop.eventLoop

    // isUnconfinedLoopActive изначательно равен false поэтому для
    // родительской корутины срабатывает вторая ветка, а для дочерних - первая
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
        // выполнение дочерних корутин ставится в очередь EventLoop'а
        eventLoop.dispatchUnconfined(this)
        true
    } else {
        /*
        выполняет Continuation.resumeWith() для родительской корутины,
        дочерние в этот момент создаются и кладутся в очередь EventLoop'а, 
        после завершения инициализация родительской корутины дочерние
        по очереди берутся из EventLoop'а и выполняются
        */
        runUnconfinedEventLoop(eventLoop, block = block)
        false
    }
}

Consider that EventLoop this is a simple task queue or lambda as in our case, where requests for execution of child coroutines are placed, then they are executed in the order in which they were added to the queue, just such an organization ensures sequential execution of coroutines:

/*
viewModelScope под капотом использует Dispatchers.Main.immediate,
который не будет переключать дочерние корутины, так как они и так
находятся на главном потоке, поэтому будет задействован 
механизм EventLoop'а
*/
viewModelScope.launch {
    launch {
        println("I'm the second!")
    }
    launch {
        println("I'm the third!")
    }
    /*
    весь блок кода в родительской корутине выполняется 
    до момента выполнения дочерних корутин, это необходимо
    чтобы поставить дочерние корутины в очередь, 
    а потом начать их выполнять
    */
    println("I'm the first!")
}

As an addition, I will give a couple more interesting features EventLoop'A:

  1. EventLoop inherited from CoroutineDispatcher and can be used in coroutines, for example this works runBlocking()

  2. EventLoop is created for each thread through the mechanism ThreadLocal variables, such as instance Looper class from Android

  3. Under the hood EventLoop lies ArrayDeque from Kotlin collections to form a task queue.

Finally, let's look at how runBlocking() waits for its coroutines to complete and respects their order, although the answer is obvious – it is used EventLoop:

/*
EventLoop создаётся на основе переданного CoroutineContext'а и кладётся 
в специальную корутину BlockingCoroutine, затем вызывается 
метод joinBlocking(), который ждёт пока все дочерние корутины 
не выполнятся
*/
actual fun <T> runBlocking(
    context: CoroutineContext, 
    block: suspend CoroutineScope.() -> T
): T {  
    val currentThread = Thread.currentThread()
    val eventLoop: EventLoop = ...
    val newContext: CoroutineContext = ...
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking()
}

private class BlockingCoroutine<T>(
    parentContext: CoroutineContext,
    private val blockedThread: Thread,
    private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true, true) {

    /*
    ожидание происходит в while(true) цикле, а любой бесконечный цикл 
    как вы уже догадываетесь блокирует текущий поток, отсюда 
    и название runBlocking
    чтобы завершить цикл ожидания нужно поменять isCompleted на false,
    это произойдет только когда все дочерние корутины завершатся
    */
    fun joinBlocking(): T {
        registerTimeLoopThread()
        try {
            eventLoop?.incrementUseCount()
            try {
                while (true) {
                    val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                    if (isCompleted) break
                }
            } finally { // paranoia
                eventLoop?.decrementUseCount()
            }
        } finally { // paranoia
            unregisterTimeLoopThread()
        }
        val state = this.state.unboxState()
        (state as? CompletedExceptionally)?.let { throw it.cause }
        return state as T
    }
}

It is precisely because of the infinite cycle runBlocking() waits for all child coroutines to complete, while blocking the current thread.

I would like to add that the order in runBlocking() will only be guaranteed when coroutines are executed without any asynchronous calls or switches to other dispatchers, for example here the order will not be respected:

fun main() = runBlocking<Unit> {
    launch {
        val result = withContext(Dispatchers.IO) {
            delay(500)
            "I'm the second!"
        }
        println(result)
    }
    launch {
        println("I'm the first!")
    }
}

Conclusion

I didn’t think that I would make it to this point myself, the article turned out to be very voluminous and without a drop of self-confidence I declare that it was incredibly useful!

As a final word, I have collected a couple of facts:

  1. Coroutine is just a convenient abstraction with cool features over asynchronous code execution, an example of asynchronous execution can be switching to another thread, using the main thread queue (MessageQueue from Android), etc.

  2. Continuation – the building block on which almost the entire coroutine library is built is the simplest interface with a single method resumeWith()this method is called when the transition between the suspended state of the coroutine and its running state occurs.

  3. Suspended state – since coroutines allow you to write asynchronous code in a sequential style, a mechanism for returning to the points of execution of this code is needed, in most cases such a mechanism is implemented using callbacks, which are exactly what Continuation implementation.

  4. To the implementations Continuation interface include: regular coroutine StandaloneCoroutinegenerated suspend block based on ContinuationImplimplementation for dispatchers DispatchedContinuationcoroutine used in runBlocking() method – BlockingCoroutine and others.

  5. DispatchedContinuation wraps up others Continuation objects to pass execution to Continuation.resumeWith() method to the dispatcher.

  6. CoroutineDispatcher in most cases it is used to switch a coroutine to another or other threads, but there are exceptions, such as EventLoop for example, which allows you to execute coroutines in the correct order.

  7. EventLoop – this is a simple task queue where requests for execution of coroutines are placed, and then they are executed in the order in which they were added to the queue, such an organization ensures the correct order of execution, but this only works if there are no switches through dispatchers in any coroutine.

Useful links:

  1. My telegram channel

  2. Report from Yandex SHMR 2024

  3. Coroutines Course by Android Broadcast

  4. Official doc

  5. Cool talk from the creator of the library

  6. Library repository on Github

Write your opinion in the comments and good code to everyone!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *