Kotlin Coroutines Under the Hood
Most likely, you were asked at a job interview “how do coroutines work under the hood?”, you without thinking twice threw out something like “there is a state machine under the hood, it determines which suspend function will be executed”, but did you really understand everything that was being said? Perhaps only you know this, but to be honest, I understood my own answers to such questions very poorly, as paradoxical as it may sound, and even after a dozen interviews I did not have a complete picture of how the inside of this truly incredible library of “sweet asynchronousness” works.
Okay, the introduction turned out to be too drawn out, as it happens in unprofessional films from which you expect action for half the running time, and then you get dull fights with glow sticks, let's go and figure it out!
Introducing the Cool Guys: CoroutineContext and CoroutineScope
Let's start with a simple example:
fun main() = runBlocking {
// запускаем новую корутину
launch {
println("Hello, I'm a Kotlin coroutine, how are you?")
}
}
For now we are only interested in the function launch
which is most often used to create coroutines, let's dive into its source code:
// (1) launch является Kotlin Extension функцией для CoroutineScope
fun CoroutineScope.launch(
// (2) контекст похож на HashMap'у, также хранит всякие штуки по ключу
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// (3) при создании новой корутины контекст может быть изменён
val newContext = newCoroutineContext(context)
// к остальной части кода вернёмся позже
}
Let's go through in more detail the points that I highlighted in the comments to the code:
1) To create a new coroutine you need to call launch
within CoroutineScope
no other way, it was done so that the coroutine could get a context and pass it to child coroutines, if you look at the source CoroutineScope
then everything will become obvious:
interface CoroutineScope {
val coroutineContext: CoroutineContext
}
When a coroutine is started, the current context is taken from CoroutineScope
in which he was summoned launch
.
2) CoroutineContext
is not a hash table, but is implemented based on Pattern Composer:
/*
основной прикол паттерна Компоновщик: создать дерево
из вложенных друг в друга объектов
чтобы реализовать такой паттерн нужен общий родитель,
которым в данном случае является CoroutineContext
*/
/*
обычные элементы контекста, такие как Job, CoroutineName и тд являются
простыми объектами, которые не содержат другие или в терминах паттерна
листовыми узлами
*/
interface Element : CoroutineContext {
val key: Key<*>
/*
для удобного доступа к элементам контекста переопределён оператор get
это позволяет делать такие штуки:
coroutineContext[Job] вместо coroutineContext.get(Job)
*/
override operator fun <E : Element> get(key: Key<E>): E? =
if (this.key == key) this as E else null
}
/*
помимо листовых узлов есть комплексные наборы данных,
которые могут в себя включать другие такие наборы
и простые объекты
*/
class CombinedContext(
val left: CoroutineContext,
val element: CoroutineContext.Element
) : CoroutineContext {
override fun <E : Element> get(key: Key<E>): E? {
/*
логика простая: проверяем сначала простой объект,
если ключи не совпадают, смотрим left, если он является
комплексным узлом CombinedContext, рекурсивно повторяем
*/
var currentContext = this
while (true) {
currentContext.element[key]?.let { return it }
val next = currentContext.left
if (next is CombinedContext) {
currentContext = next
} else {
return currentContext.get(key)
}
}
}
}
class CoroutineName(val name: String) : CoroutineContext.Element {
/*
в качестве ключей для обычных элементов CoroutineContext
используются названия самих классов, что очень удобно
поле key требует наследника Key<*> который определён ниже
через companion object, это работает даже если на первый взгляд
выглядит сомнительно и неоднозначно
*/
override val key: Key<*> = CoroutineName
companion object Key : Key<CoroutineName>
}
fun main() {
// Job и CoroutineDispatcher являются элементами CoroutineContext
val combinedContext = CombinedContext(
CombinedContext(
Job(),
Dispatchers.Default
),
CoroutineName("My name's Kotlin coroutine")
)
/*
в итоге мы можем положить в CoroutineContext то что нужно корутинам:
Job, CoroutineName, CoroutineDispatcher, CoroutineExceptionHandler и тд,
а затем прокидывать контекст через CoroutineScope в сами корутины
*/
val job = combinedContext[Job]
}
3) When creating a new coroutine, you can change CoroutineContext
:
launch(CoroutineName("I'm a parent coroutine")) {
launch(CoroutineName("I'm child coroutine"))) {
// ...
}
}
The logic of operation in this case is as follows: the coroutine receives the current context from CoroutineScope
and adds it to the context passed as a function parameter launch
so the child coroutine from the example contains a different name.
Important point: not all elements CoroutineContext
can be correctly changed, for example, when specifying a different one for the child coroutine Job
you can destroy the principle Structured Concurrencywhich consists of cascading cancellation of all coroutines.
Let's sum it up:
coroutine is created via Kotlin Extension function
CoroutineScope.launch
CoroutineScope
is a simple interface that provides a context to a coroutineCoroutineContext
needed to store all sorts of useful things when running a coroutine:CoroutineName
,CoroutineDispatcher
,Job
,CoroutineExceptionHandler
etcwhen creating a coroutine, you can pass a new context, this will result in the creation of a context based on the current one with modified elements taken from the new one
Continuation interface and implementation of suspend block
Let's return again to the initial example:
fun main() = runBlocking {
launch {
println("Hello, I'm a Kotlin coroutine, how are you?")
}
}
public fun CoroutineScope.launch(
// ...
block: suspend CoroutineScope.() -> Unit
): Job {}
Pay attention to the keyword suspend
precisely because launch
executes the lambda marked with this keyword we can run suspend
functions within a coroutine, what's even more interesting is that this lambda turns into something interesting during compilation:
BuildersKt.launch$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label = 0;
public final Object invokeSuspend(Object var1) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(var1);
String var2 = "Some";
System.out.println(var2);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
// этот метод будет использоваться для создания Continuation объекта
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
}), 3, (Object)null);
Aha, here it is, the very same state machine (switch block) that they casually mention in interviews; now it doesn’t do anything complicated except output text to the console and return an empty result.
After quite a few long evenings of digging into the source code and debugging, I finally figured out that the code generated above is nothing more than an implementation of an abstract class. ContinuationImpl
one of the heirs Continuation
:
/*
само название говорит за себя, что это "продолжение" после приостановки
корутины, да та самая магическая приостановка или SUSPENDED состояние о
которой говорят постоянно, позже узнаем что никакой магии тут нет
*/
public interface Continuation<in T> {
/*
как мы выяснили в предыдущем разделе CoroutineContext содержит
важные штуки для корутин, например CoroutineDispatcher, который
может пригодиться для переключения потоков
*/
public val context: CoroutineContext
/*
вызов этого метода происходит после возвращения корутины из состояния
приостановки, именно сюда кладутся результаты suspend функций, также
дочерние корутины могут вызывать этот метод у родительских для
продолжения работы последних
*/
public fun resumeWith(result: Result<T>)
}
Without such a thing as Continuation
coroutines could not return to where the suspension occurred and therefore we could not execute code on different threads using sequential code writing, let me remind you that one of the key ideas is precisely the execution of asynchronous code as sequential, a small example:
// создаём корутину на главном потоке
launch {
// вызываем функцию fetchAndroidUnderTheHoodPosts() в background потоке
val myPosts = fetchAndroidUnderTheHoodPosts()
/*
чтобы следующий код получил результат нужно как минимум
каким-то образом получить его и не забыть переключиться на главный поток,
а так как fetchAndroidUnderTheHoodPosts() выполняется на другом потоке
и неизвестно когда функция закончит своё выполнение,
остаётся только вариант передачи в функцию callback'а,
который будет вызван когда она завершится,
таким callback'ом является Continuation объект
*/
println(myPosts)
}
After the function fetchAndroidUnderTheHoodPosts
will finish executing its code, the result will be passed through Continuation.resumeWith()
which will lead to further execution of the coroutine, in the current example – outputting all posts to the console.
Okay, we've decided that without Continuation
nothing will come of it and even found out that the suspend block is in launch
functions are actually also Continuation
object and inherits from ContinuationImpl
but the main implementation is contained in BaseContinuationImpl
class from which it inherits ContinuationImpl
is it difficult? No problem, get used to it, this is hardcore, not a cute cartoon about ponies.
Let's go see the implementation Continuation
for suspend block:
/*
компилятор генерирует реализацию этого
абстрактного класса для suspend блоков
*/
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
// контекст берётся из корутины, дальше это увидим
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
private var intercepted: Continuation<Any?>? = null
/*
ContinuationInterceptor это штука которая оборачивает
текущий Continuation объект в новый, например DispatchedContinuation
и возвращает его, такой механизм используется для переключения потоков
через CoroutineDispatcher, кстати оборачивание одного объекта
в другой с общим интерфейсом (Continuation) ничто иное как
паттерн Декоратор
*/
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// очистка обёрнутого Continuation объекта:
// зануление ненужных ссылок и тд
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
// корутина закончила своё выполнение
this.intercepted = CompletedContinuation
}
}
/*
если в ContinuationImpl реализована поддержка CoroutineDispatcher'ов,
то в BaseContinuationImpl содержится основная логика работы с состоянием
приостановки
*/
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {и
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
/*
текущая реализация Continuation требует в качестве completion
более высокоуровневый Continuation объект, обычно им является
сама корутина
напоминаю что мы сейчас рассматриваем реализацию
suspend блока в launch функции, а не саму корутину
*/
val completion = completion!!
val outcome: Result<Any?> =
try {
/*
вот тут происходит самое интересное, invokeSuspend
выполняет внутри себя тот самый сгенерированный код
в котором содержатся наши suspend функции и если одна
из них перешла в состояние приостановки,
метод тупо делает return, Continuation в данном
случае не продолжит своё выполнение пока
не будет снова вызван resumeWith()
*/
val outcome = invokeSuspend(param)
/*
COROUTINE_SUSPENDED - зарезервированная константа,
сигнализирующая о том, что внутри invokeSuspend
произошла приостановка
*/
if (outcome === COROUTINE_SUSPENDED) return
// если invokeSuspend вернул результат получаем его
Result.success(outcome)
} catch (exception: Throwable) {
// если произошло исключение тоже получаем его
Result.failure(exception)
}
/*
так как текущий BaseContinuationImpl получил результат,
значит suspend блок в корутине завершился, поэтому
текущий объект BaseContinuationImpl больше не нужен,
а следовательно всякие используемые штуки,
такие как CoroutineDispatcher'ы например должен быть очищены
*/
releaseIntercepted()
/*
если мы запустили корутину в другой корутине, то в качестве
completion будет suspend блок родительской корутины:
launch { родительский suspend блок BaseContinuationImpl
launch { дочерний suspend блок BaseContinuationImpl
}
}
*/
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
/*
вызывается самый высокоуровневый Continuation объект,
в большинстве случаев это сама корутина
*/
completion.resumeWith(outcome)
return
}
}
}
}
// тот самый метод, переопределённый в сгенерированном коде
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
To sum it up:
we have
Continuation
an interface that allows a coroutine to continue execution after it has been suspendeda special implementation is generated for the suspend block
ContinuationImpl
with a state machine (switch or when construct) in an overridden methodinvokeSuspend()
When the suspend function is paused the following things happen:
–invokeSuspend()
returns a special valueCOROUTINE_SUSPENDED
–BaseContinuationImpl
terminates with return and awaits the next callresumeWith()
The logic for handling the suspended state is contained in
BaseContinuationImpl
and the logic of switching flows usingCoroutineDispatcher
'ov occurs in the heirContinuationImpl
.
What is coroutine?
Now we can figure out what a coroutine actually is and how the suspend block is executed in it. To do this, we return to the source code again. launch
functions:
public fun CoroutineScope.launch(
// ...
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
/*
мы не будем рассматривать LazyStandaloneCoroutine, так как эта штука
очень похожа на базовую реализацию корутины StandaloneCoroutine,
только запускается по требованию
*/
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
/*
запускаем корутину, LazyStandaloneCoroutine не запустится таким образом,
нужно будет вручную вызвать метод start() у объекта Job,
который возвращает launch
*/
coroutine.start(start, coroutine, block)
return coroutine
}
Remember in BaseContinuationImpl
there was such a parameter as completion, I also said that it could be the coroutine itself, so here it is StandaloneCoroutine
and there is a coroutine implementation, in short, let's not drag it out, let's go look at the source code:
// корутина наследует Job, Continuation и CoroutineScope
class StandaloneCoroutine<in T>(
parentContext: CoroutineContext,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
init {
/*
кладёт Job текущей корутины в Job'у родительской
это нужно чтобы родительские корутины знали о
дочерних и не завершались раньше них
*/
initParentJob(parentContext[Job])
}
/*
так можно сделать потому что StandaloneCoroutine
является реализацией Job, а Job является одним из элементов
CoroutineContext'а
*/
override val context: CoroutineContext = parentContext + this
// вы уже знаете, что когда корутина выходит из состояния приостановки,
// вызывается метод Continuation.resumeWith()
override fun resumeWith(result: Result<T>) {
// makeCompletingOnce пытается завершить Job'у корутины
val state = makeCompletingOnce(result.toState())
// если у текущей Job'ы есть дочерние и они не были завершены,
// корутина не может быть завершена
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
/*
запуск suspend блока в корутине происходит в CoroutineStart enum'е,
в качестве параметра receiver передаётся сама корутина, это нужно
чтобы suspend блок получил CoroutineContext, если уже забыли про это,
возвращайтесь к исходнику ContinuationImpl
completion это как раз то самый высокоуровневый Continuation объект,
вызываемый из сгенерированного Continuation объекта для suspend блока,
как я уже говорил ранее им является сама корутина
*/
fun <R> start(
start: CoroutineStart,
receiver: R,
block: suspend R.() -> T
) {
start(block = block, receiver = receiver, competion = this)
}
}
// поначалу я долго искал где запускается корутина, так как не сразу
// заметил что у CoroutineStart переопределён invoke оператор
enum class CoroutineStart {
// я опустил остальные варианты, оставил только базовый
DEFAULT;
operator fun <R, T> invoke(
block: suspend R.() -> T,
receiver: R,
completion: Continuation<T>
): Unit =
when (this) {
// для suspend блока будет сгенерированна ContinuationImpl
// реализация, которая запустится при вызове launch функции
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
else -> Unit
}
}
All that remains is to figure out how the suspend block is launched, for this we fall one level lower, in startCoroutineCancellable()
function:
// обратите внимание, что startCoroutineCancellable() является
// Kotlin Extension функцией для suspend блока
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R,
completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
// первым делом из suspend блока создаётся Continuation объект
createCoroutineUnintercepted(receiver, completion)
/*
далее оборачивается в DispatchedContinuation, если используется
CoroutineDispatcher, а он в большинстве случаев используется
*/
.intercepted()
/*
ну и происходит вызов Continuation.resumeWith()
в зависимости от типа Continuation, чтобы блок в корутине
начал выполняться, иначе ничего не произойдёт
*/
.resumeCancellableWith(Result.success(Unit), onCancellation)
// создаёт Continuation из suspend блока
actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
/*
в нашем случае suspend блок является объектом ContinuationImpl,
а это наследник BaseContinuationImpl, поэтому выполняется
первая ветка
*/
return if (this is BaseContinuationImpl)
// метод create будет сгенерирован компилятором для ContinuationImpl
// реализации, кстати ранее уже был пример сгенерированного кода
create(receiver, completion)
else
...
}
/*
делает вызов resumeWith в зависимости от типа Continuation:
1) DispatchedContinuation может передать вызов resumeWith()
CoroutineDispatcher'у, который выполнит код на другом потоке
2) обычный вызов Continuation.resumeWith() произойдёт без
смены потоков и тд
*/
fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
We can say that you have completed Jedi training and are ready for a real fight, let's look at the very first example:
fun main() = runBlocking {
// запускаем новую корутину
launch {
println("Hello, I'm a Kotlin coroutine, how are you?")
}
}
If we look at the example from the point of view of the internals of coroutines, we get approximately the following code:
fun main() {
SuspendLaunchBlock(
// (2)
completion = StandaloneCoroutine()
).resumeWith(Result.success(Unit)) // (3)
}
// (1)
class SuspendLaunchBlock(
completion: StandaloneCoroutine<Any?>
) : ContinuationImpl(completion) {
var label = 0
// (5)
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (newResult === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
completion.resumeWith(newResult)
}
// (4)
fun invokeSuspend(result: Result<Any?>): Any? {
when(label) {
0 -> {
throwIfFailureResult(result)
println("Hello, I'm a Kotlin coroutine, how are you?")
return Unit
}
else -> error("Illegal state")
}
}
}
class StandaloneCoroutine<T>(...) : Continuation<T> {
// (6)
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
Let's take it in order:
Generated implementation is created
SuspendLaunchBlock
for suspend block inlaunch
functions with a state machine or, more specifically, a when construct.It is being created
StandaloneCoroutine
and is passed as a completion parameter inSuspendLaunchBlock
The coroutine is launched via a call
SuspendLaunchBlock.resumeWith()
method, which then executes the generatedinvokeSuspend()
methodIN
invokeSuspend()
the only branch in the when block is executed – output to the console and return an empty resultAfter finishing
invokeSuspend()
VSuspendLaunchBlock.resumeWith()
First, a check is made for the suspended state, in this caseinvokeSuspend()
executed without pausing, so it is called immediatelycompletion.resumeWith()
and since completion isStandaloneCoroutine
then it is calledStandaloneCoroutine.resumeWith()
implementationStandaloneCoroutine.resumeWith()
checks if there are any unfinished child coroutines, we don't have any, and stops execution
You are practically a Jedi Master! Take a break, make some tea or coffee, eat some chocolate and go through the example again, it is very important to understand that the mechanism of coroutines consists of transitions between suspension states through a call Continuation.resumeWith()
method, and completion occurs at the root Continuation
object and most importantly, there is no magic.
What if there is a chain of suspend functions in a coroutine?
Let's complicate the example from the previous section by adding two suspend functions:
// опустим подробности реализации
suspend fun fetchAuthToken() = ...
suspend fun fetchProfileInfo(token: String) = ...
fun main() = runBlocking {
// запускаем корутины с двумя suspend функциями
launch {
val token = fetchAuthToken()
val profile = fetchProfileInfo(token = token)
println(profile)
}
}
I wonder what the code will be now from the point of view of the internals of coroutines, let's look:
fun main() {
SuspendLaunchBlock(
// (2)
completion = StandaloneCoroutine()
).resumeWith(Result.success(Unit)) // (3)
}
// (5)
suspend fun fetchAuthToken(continuation: SuspendLaunchBlock): Any? {
/*
любое асинхронное выполнение кода приводит к состоянию приостановки
это необязательно использование многопоточности, дальше вы это увидите
runCodeInBackground - магический метод, который выполняет
блок кода в фоновом потоке
*/
runCodeInBackground {
val token = ...
runCodeInMain {
// (6)
// магический метод, который выполняет блок кода на главном потоке
continuation.resumeWith(token)
}
}
return COROUTINE_SUSPENDED
}
suspend fun fetchProfileInfo(token: String) = ...
// (1)
class SuspendLaunchBlock(
completion: StandaloneCoroutine<Any?>
) : ContinuationImpl(completion) {
var label = 0
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
// (9)
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
// while(true) нужен чтобы выполнять ветки дальше,
// если suspend функция не перешла в состояние приостановки
while (true) {
when(label) {
// (4)
0 -> {
throwIfFailureResult(result)
label = 1
// Continuation передаётся в качестве
// аргумента suspend функции
val state = fetchAuthToken(this)
if (state == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
// (7)
1 -> {
throwIfFailureResult(result)
label = 2
val token = result.unwrap()
val state = fetchProfileData(token, this)
if (state == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
// (8)
2 -> {
throwIfFailureResult(result)
val profile = result.unwrap()
println(profile)
break
}
else -> error("Illegal state")
}
}
return Unit
}
}
class StandaloneCoroutine<T>(...) : Continuation<T> {
// (10)
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
To make it more interesting, let's assume that both suspend functions go into the suspended state, let's look at the logic:
Generated implementation is created
SuspendLaunchBlock
for suspend block inlaunch
functions with a state machine or, more specifically, a when construct.It is being created
StandaloneCoroutine
and is passed as a completion parameter inSuspendLaunchBlock
The coroutine is launched via a call
SuspendLaunchBlock.resumeWith()
method, which then executes the generatedinvokeSuspend()
methodIN
invokeSuspend()
the first branch is executed (label == 0), where the function call occursfetchAuthToken()
the current one is passed as the only parameterContinuation
object, in this case it isSuspendLaunchBlock
the value of the variable label changes to 1Function
fetchAuthToken()
returns valueCOROUTINE_SUSPENDED
which indicates a suspended state, it is important that there is no magic here, the code execution occurs in another thread, and since this is asynchronous execution, the external code can only pass the result through a callback, which by the way isSuspendLaunchBlock
After executing your code
fetchAuthToken()
calls the methodSuspendLaunchBlock.resumeWith()
with the result of its work, in the example it is a string with a tokenSuspendLaunchBlock.resumeWith()
resumes its execution and calls invokeSuspend() again, where the second branch is already executed (label == 1), in it the fetchProfileData() method is called, as the first parameter it receives the token from the previous suspend functionfetchAuthToken()
and as the second, a link to the Continuation object, which, as we already know, isSuspendLaunchBlock
methodfetchProfileData()
is performed similarlyfetchAuthToken()
label becomes equal to 2In the last branch
invokeSuspend()
where label == 2, the function result is output to the consolefetchProfileData()
and return an empty valueThis time the returned value is from
invokeSuspend()
is notCOROUTINE_SUSPENDED
therefore the executionSuspendLaunchBlock
ends, further control is transferredStandaloneCoroutine
via callcompletion.resumeWith()
StandaloneCoroutine.resumeWith()
checks if there are any unfinished child coroutines, we don't have any, and stops execution
Great, now you know how transitions between individual suspend functions occur, congratulations, you can safely show off your smarts at interviews, but don't forget that modesty makes a man look good)
Thread switching, delay() and CoroutineDispatcher
We forgot about the most important thing for which coroutines are used in principle – execution of suspend functions on other threads, hence the need to suspend execution of the current coroutine, as we have already found out, for this you need to pass Continuation
object suspend function and wait until it calls itself Continuation.resumeWith()
method:
fun fetchAuthToken(continuation: Continuation<Any?>): Any? {
// магический метод, который выполняет блок кода в фоновом потоке
runCodeInBackground {
val token = ...
// магический метод, который выполняет блок кода на главном потоке
runCodeInMain {
/*
чтобы сообщить корутине что suspend функция завершила
своё выполнение нужно вызвать Continuation.resumeWith()
с результатом работы функции
*/
continuation.resumeWith(token)
}
}
// так как функция не может сразу вернуть результат,
// она переходит в состояние приостановки
return COROUTINE_SUSPENDED
}
This is a fairly simplified version of the code, but it does reflect the general mechanism, and most importantly, it shows that the suspended state is nothing more than executing some code on another thread and returning the result through Continuation.resumeWith
as through a regular callback.
In the previous section I briefly mentioned that any asynchronous execution leads to a suspended state, but I didn't cover this topic, let's figure it out.
What is needed to make the code asynchronous? The correct way is to make its execution independent of the current execution point, for example, to create a new thread:
fun main() {
// создаётся новый поток и сразу запускается
Thread {
val sum = 3 + 7
println(sum)
}.start()
// код main() продолжает выполняться независимо от того,
// выполнился ли весь код в Thread
val mul = 3 * 7
println(mul)
// функция main() может завершиться раньше созданного потока
}
But let's forget about multithreading for a minute and remember the main thread of Android, will it allow us to execute code asynchronously? Of course yes, you can do it through the good old Handler
:
// Handler поставит выполнение кода в очередь главного потока
handler.post {
println("I'll run soon")
}
By the way Handler
used in the implementation of the function delay
for the main thread of Android, if you forgot, this function allows you to make a delay without blocking the current thread:
fun delay(continuation: Continuation<Any?>): Any? {
val block = Runnable {
// после того как задержка пройдёт, выполнится этот блок кода
// и корутина продолжит своё выполнение после приостановки
continuation.resumeWith(Result.success(Unit))
}
// handler.postDelayed() выполняет block через указанный
// промежуток времени в миллисекундах
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
// если корутина была отменена нужно отменить задержку
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
// отменяет текущую корутину, так как Handler для главного потока
// был закрыт
cancelOnRejection(continuation.context, block)
}
return COROUTINE_SUSPENDED
}
That's the whole point of asynchronous code execution, it can be any mechanism that allows you to execute X code independently of Y code, and if you try to define coroutines now, it will be something like this:
A coroutine is an abstraction that wraps some asynchronous work, whether it's running code in another thread or using a main thread queue like Android's MessageQueue, into nice sequential code with cancellation mechanisms and other cool stuff.
Okay, it seems like we figured out what suspending a coroutine and asynchronous code execution are, we can move on to more practical things, for example, to the function withContext()
most often used to change CoroutineDispatcher
'A:
suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
/*
как мы знаем Continuation работает под капотом корутин и его нельзя
получить явно в прикладном коде, поэтому была придумана inline функция
suspendCoroutineUninterceptedOrReturn (и не одна кстати),
которая после компиляции подставит текущий Continuation объект
*/
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val oldContext = uCont.context
/*
если вы ещё не забыли то новый контекст производится
путём сложения двух контекстов, в качестве результата
мы имеем контекст в котором старые элементы заменены новыми,
например Dispatchers.Main можно поменять на Dispatchers.Default
*/
val newContext = oldContext.newCoroutineContext(context)
// проверка что корутина все ещё выполняется
newContext.ensureActive()
// мы не будем рассматривать все ветки, нас интересует только
// переключение потоков через CoroutineDispatcher
if (newContext === oldContext) {
...
}
// CoroutineDispatcher является наследником ContinuationInterceptor
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
...
}
// ну вот опять создаётся какая-то неизвестная нам корутина,
// не беспокойтесь там достаточно простая логика
val coroutine = DispatchedCoroutine(newContext, uCont)
// стартуем также как и обычную корутину
block.startCoroutineCancellable(coroutine, coroutine)
/*
если есть возможность сразу отдать результат без приостановки корутины
то withContext сразу завершится, в противном случае корутина,
содержащая withContext() вызов перейдёт в состояние приостановки
*/
coroutine.getResult()
}
}
Function withContext
does 3 simple things:
Gets the current
Continuation
object, it can be taken from the suspend parameter of the function or from the current coroutine in which the function was calledwithContext
Takes context from
Continuation
object and adds it to the context passed as a parameter, resulting in a new context being created.Based on the new context, creates a certain type of coroutine, for example if it was changed
CoroutineDispatcher
a coroutine will be createdDispatchedCoroutine
Well, let's look at the source code now. DispatchedCoroutine
:
/*
чаще всего в качестве continuation параметра выступает
Continuation объект, который генерируется для suspend блока в корутине,
если забыли, то это реализация абстрактного класса ContinuationImpl
*/
internal class DispatchedCoroutine<in T> internal constructor(
context: CoroutineContext,
continuation: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
/*
метод afterResume() вызывается перед завершением Continuation.resumeWith(),
когда корутина закончила выполнять все свои suspend функции и
у неё больше нет дочерних корутин в состоянии выполнения
*/
override fun afterResume(state: Any?) {
/*
метод afterResume() может быть вызван раньше getResult(),
например если блок кода в withContext() очень быстро выполнился
в таком случае результат вернёт getResult()
*/
if (tryResume()) return
/*
Я уже вскользь упоминал что делает каждый метод в этой цепочке,
когда мы рассматривали как стартует корутины, ещё раз повторим:
intercepted() оборачивает continuation в DispatchedContinuation,
который реализует логику работы с CoroutineDispatcher'ами
resumeCancellableWith() вызывает resumeWith() в зависимости от типа
Continuation, в данном случае будет вызван метод
DispatchedContinuation.resumeCancellableWith()
*/
continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
internal fun getResult(): Any? {
// если нельзя сразу вернуть результат корутина приостанавливается
if (trySuspend()) return COROUTINE_SUSPENDED
// результат выполнения withContext()
val state = ...
return state as T
}
}
To sum it up, DispatchedCoroutine
performs two key tasks:
Adds the ability to return the result without going into the suspended state, if such an option exists, for this purpose it is used
getResult()
method.Switches
Continuation
the object in which it was calledwithContext()
on the native thread, for example if your coroutine is running on the main thread, then callswithContext()
in the background, then the result should return to the main one again.
Okay, okay. DispatchedCoroutine
more or less figured it out, to understand how the switching of threads in coroutines actually happens, let's fall into DispatchedContinuation
into which others turn Continuation
objects:
/*
DispatchedContinuation принимает на вход:
dispatcher - выполняет блок кода, чаще всего на другом потоке
или с использованием очереди, например Handler / MessageQueue из Android
continuation - Continuation объект, работа с которым будет происходить
через указанный диспатчер
*/
internal class DispatchedContinuation<in T>(
val dispatcher: CoroutineDispatcher,
val continuation: Continuation<T>
) : Continuation<T> by continuation {
/*
логика resumeWith() идентична resumeCancellableWith() с отличием только
в разных режимах resumeMode, обычно чаще всего вызывается
именно resumeCancellableWith, так как режим MODE_CANCELLABLE
позволяет прокинуть CancellationException для отмены корутины
*/
override fun resumeWith(result: Result<T>) { ... }
internal inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
/*
CoroutineDispatcher имеет два логически связанных метода:
isDispatchNeeded() решает выполнять код в диспатчере или нет
dispatch() выполняет код в диспатчере: код может выполниться на
другом потоке, поставлен в очередь и тд
*/
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
// для наглядности я упростил блок кода и написал его здесь
val block = Runnable {
continuation.resumeWith(state)
}
dispatcher.dispatch(context, block)
} else {
/*
если диспатчер не хочет выполнять код, а такое может быть
например если диспатчер переключает на главный поток, а мы уже
на главном потоке и внутри диспатчера реализована проверка,
то isDispatchNeeded() вернёт false, в таком случае выполнение
корутины будет добавлено в EventLoop
*/
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
}
As you can see, the logic CoroutineDispatcher
'and quite simple:
The method is called
isDispatchNeeded()
to understand whether to give the execution of the code to the dispatcher or not, this is necessary to avoid unnecessary callsdispatch()
for example, not to switch to the main thread if we are already on itIf
isDispatchNeeded()
returned true of course we give the execution of the code to the dispatcher by calling the methoddispatch()
If
isDispatchNeeded()
returned false start coroutine onEventLoop
'e, more on that in the next section
As an example, let's consider such interesting dispatchers from Android as: Dispatchers.Main
And Dispatchers.Main.immediate
:
val mainLooper = Looper.getMainLooper()
val handler = Handler(mainLooper)
// реализация для Dispatchers.Main
override fun isDispatchNeeded(...) = true
// реализация для Dispatchers.Main.immediate
override fun isDispatchNeeded(...): Boolean {
// сравнивает Looper текущего потока с главным
return Looper.myLooper() != mainLooper
}
override fun dispatch(
context: CoroutineContext,
block: Runnable
) {
// handler.post() выполняет блок кода на главном потоке
handler.post(block)
}
Here's the whole difference between them: Dispatchers.Main
always switches code execution to the main thread via Handler.post()
A Dispatchers.Main.immediate
only if the code is not running on the main thread.
To consolidate our knowledge, let's try to put everything together and describe the logic for the following example:
// примерно такой код можно встретить в рабочих проектах
viewModelScope.launch {
// я не стал выносить в отдельную функцию, чтобы не усложнять пример
val posts = withContext(Dispatchers.IO) {
try {
// получаем список постов в background потоке
apiService.fetchPosts()
} catch (exception: Exception) {
// важно прокидывать CancellationException дальше
// так как это часть механизма отмены корутины
if (exception is CancellationException) throw exception
emptyList()
}
}
// отображаем данные на главном потоке
println(posts)
}
Under the hood, all this code will look something like this:
// (1)
class ViewModelScopeLaunchBlock(
completion: Continuation<Any?>
) : ContinuationImpl(completion) {
var label = 0
// (5)
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
// (17)
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
// while(true) нужен чтобы выполнять ветки дальше,
// если suspend функция не перешла в состояние приостановки
while (true) {
when(label) {
// (5)
0 -> {
throwIfFailureResult(result)
label = 1
// Continuation передаётся в качестве
// аргумента suspend функции
val state = fetchPosts(this)
// (10)
if (state == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
// (16)
1 -> {
throwIfFailureResult(result)
val profile = result.unwrap()
println(profile)
break
}
else -> error("Illegal state")
}
}
return Unit
}
}
class StandaloneCoroutine(...) {
// (18)
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
// (6)
class WithContextBlock(
completion: DispatchedCoroutine
) : ContinuationImpl(completion) {
var label = 0
// (12)
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
// (12)
completion.resumeWith(newResult)
}
// (11)
fun invokeSuspend(result: Result<Any?>): Any? {
try {
val posts = apiService.fetchPosts()
return posts
} catch (exception: Exception) {
// важно прокидывать CancellationException дальше так как это часть
// механизма отмены корутины, вспомните resumeCancellableWith
if (exception is CancellationException) throw exception
return emptyList()
}
}
}
class DispatchedCoroutine(
...
// (7)
val continuation: ViewModelScopeLaunchBlock
): ScopeCoroutine(context, continuation) {
// (13)
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
// (14)
override fun afterResume(state: Any?) {
if (tryResume()) return
continuation.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
}
class DispatchedContinuation<in T>(
val dispatcher: CoroutineDispatcher,
val continuation: Continuation<T>
) : Continuation<T> by continuation {
// (4, 9, 15)
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
val block = Runnable {
continuation.resumeWith(state)
}
// (9, 15)
dispatcher.dispatch(context, block)
} else {
// (4)
continuation.resumeWith(state)
}
}
}
// (2)
val topLevelCoroutine = StandaloneCoroutine(...)
val viewModelScopeLaunchBlock = ViewModelScopeLaunchBlock(
// (2)
completion = topLevelCoroutine
)
// (3)
DispatchedContinuation(
dispatcher = Dispachers.Main.immediate
continuation = viewModelScopeLaunchBlock
).resumeCancellableWith(Result.success(Unit))
val withContextBlock = WithContextBlock(
// (7)
completion = DispatchedCoroutine(viewModelScopeLaunchBlock)
)
// (8)
DispatchedContinuation(
dispatcher = Dispatchers.IO,
continuation = withContextBlock
).resumeCancellableWith(Result.success(Unit))
Very confusing? I agree, let's go in order:
The implementation is generated
ViewModelScopeLaunchBlock
for suspend block inviewModelScope.launch()
functions with a state machine or, more specifically, a when construct.It is being created
StandaloneCoroutine
and is passed as a completion parameter inViewModelScopeLaunchBlock
ViewModelScopeLaunchBlock
turns intoDispatchedContinuation
the dispatcher that was specified in is passed asviewModelScope
for Android this dispatcher will beDispatchers.Main.immediate
Called
DispatchedContinuation.resumeCancellableWith()
ForViewModelScopeLaunchBlock
where the check for the main thread occurs, but sinceviewModelScope.launch()
and since it is executed on the main thread, there will be no switching through the dispatcher, and the methodViewModelScopeLaunchBlock.resumeWith()
will be called directlyIN
ViewModelScopeLaunchBlock.resumeWith()
a call is in progressinvokeSuspend()
where the first branch (label == 0) starts executing, in which is calledfetchPosts()
the first parameter is a reference to the currentContinuation
object, in this case it isViewModelScopeLaunchBlock
the variable label becomes equal to 1In function
fetchPosts()
is calledwithContext()
which is also likeviewModelScope.launch()
accepts a suspend block, so an implementation is generatedWithContextBlock
It is being created
DispatchedCoroutine
and is passed as a completion parameter inWithContextBlock
.
note thatDispatchedCoroutine
as an objectContinuation
acceptsViewModelScopeLaunchBlock
the block from which the function is calledwithContext()
because we need to somehow return the result back.WithContextBlock
starts similarlyViewModelScopeLaunchBlock
also turns intoDispatchedContinuation
only now it is transmitted as a dispatcherDispatchers.IO
.Called
DispatchedContinuation.resumeCancellableWith()
ForWithContextBlock
where the main thread switches to the background thread viaDispatchers.IO
dispatcher,WithContextBlock.resumeWith()
will now run on a background threadDispatchedCoroutine
cannot return the query result immediately and thereforefetchPosts()
VViewModelScopeLaunchBlock.invokeSuspend()
returnsCOROUTINE_SUSPENDED
which leads to the suspension of the coroutineMethod
WithContextBlock.invokeSuspend()
executes a single line of code – a request to the network and receiving a response on a background thread.When the request is completed the method
WithContextBlock.invokeSuspend()
will return the result inWithContextBlock.resumeWith()
where the further sending of the result will occur via the callcompletion.resumeWith()
into the coroutine object, in this case it isDispatchedCoroutine
IN
DispatchedCoroutine.resumeWith()
first, a check will be made for child coroutines in the running state, and if there are none, as in this example, the code will be executedDispatchedCoroutine.afterResume()
Method
DispatchedCoroutine.afterResume()
should return the result inViewModelScopeLaunchBlock
but there is a problem here:WithContextBlock
is currently running on the background thread provided byDispatchers.IO
AViewModelScopeLaunchBlock
must get the result on the main one, so the chain is calledcontinuation.intercepted().resumeCancellableWith()
methodintercepted()
will not re-wrapViewModelScopeLaunchBlock
VDispatchedContinuation
this is done for optimizationCalled again
DispatchedContinuation.resumeCancellableWith()
ForViewModelScopeLaunchBlock
but only now switching to the main stream occurs through the dispatcherDispatchers.Main.immediate
if you haven't forgotten it's under the hoodHandler.post()
challenge, in the endViewModelScopeLaunchBlock.resumeWith()
is executed on the main thread, and the list of posts is passed as a resultIN
ViewModelScopeLaunchBlock.resumeWith()
the second call occursViewModelScopeLaunchBlock.invokeSuspend()
now the second branch (label == 1) is executed, which takes the list of posts and outputs it to the console.Method
ViewModelScopeLaunchBlock.invokeSuspend()
completes successfully without pausing, soViewModelScopeLaunchBlock.resumeWith()
finishes its execution and makes a callcompletion.resumeWith()
where the completion is a coroutineStandaloneCoroutine
IN
StandaloneCoroutine.resumeWith()
a check is made for child coroutines in the running state, there are none in this example, the coroutine is completed.
If you've gotten this far, you're clearly not a simpleton, so be sure to take a break from the chocolate bar and try to follow the path of coroutines in your code in a similar way.
Unfortunately, this is not the end, coroutines can be launched in other coroutines and not one by one, but in whole batches, a similar mechanism with dispatchers works here, but in addition to it there is also EventLoop
if you have already rested and are ready to make the final push, let's continue!
Child coroutines, EventLoop and runBlocking
It is important to differentiate the execution of child coroutines into two types:
The child coroutine is launched via the dispatcher
The child coroutine is executed on
EventLoop
'e
The first case occurs when the dispatcher always switches the execution of a coroutine, even if it is running on the correct thread, e.g. Dispatchers.Main
the second is typical for dispatchers, where switching occurs only when necessary, a striking example from Android: Dispatchers.Main.immediate
which switches execution of the coroutine to the main thread only if it is not running on it.
Well, let's look at both cases in order, starting with the first:
/*
Dispatchers.Main всегда переключает выполнение корутины
на главный поток через Handler.post() механизм,
даже если корутина и так выполняется на главном
*/
val uiScope = CoroutineScope(Dispatchers.Main + Job())
uiScope.launch {
launch {
println("I'm child coroutine #1")
}
launch {
println("I'm child coroutine #2")
}
println("I'm parent coroutine")
}
// примерно во что всё это превратится
class UiScopeParentBlock(
completion: StandaloneCoroutine
) : ContinuationImpl(completion) {
var label = 0
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
when(label) {
0 -> {
throwIfFailureResult(result)
println("I'm parent coroutine")
return Unit
}
else -> error("Illegal state")
}
}
}
class UiScopeChild1Block(
completion: UiScopeParentBlock
) : ContinuationImpl(completion) {
var label = 0
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
when(label) {
0 -> {
throwIfFailureResult(result)
println("I'm child coroutine #1")
return Unit
}
else -> error("Illegal state")
}
}
}
class UiScopeChild2Block(
completion: UiScopeParentBlock
) : ContinuationImpl(completion) {
var label = 0
fun resumeWith(result: Result<Any?>) {
try {
val newResult = invokeSuspend(result)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(newResult)
} catch (exception: Throwable) {
Result.failure(exception)
}
completion.resumeWith(newResult)
}
fun invokeSuspend(result: Result<Any?>): Any? {
when(label) {
0 -> {
throwIfFailureResult(result)
println("I'm child coroutine #2")
return Unit
}
else -> error("Illegal state")
}
}
}
class StandaloneCoroutine(...) {
fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
/*
resumeWith() не завершится полностью пока
дочерние корутины не закончат своё выполнение
когда мы рассматривали код StandaloneCoroutine, то можно было
увидеть как происходит добавление Job'ы дочерней корутины в Job'у
родительской, поэтому родительская корутина знает состояния
своих дочерних корутин
*/
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
// родительская корутина
val parentCoroutine = StandaloneCoroutine(...)
val uiScopeParentBlock = UiScopeParentBlock(
completion = parentCoroutine
)
DispatchedContinuation(
dispatcher = Dispachers.Main
continuation = uiScopeParentBlock
).resumeCancellableWith(Result.success(Unit))
// первая дочерняя корутина
val childCoroutine1 = StandaloneCoroutine(...)
val uiScopeChild1Block = UiScopeChild1Block(
completion = childCoroutine1
)
DispatchedContinuation(
dispatcher = Dispachers.Main
continuation = uiScopeChild1Block
).resumeCancellableWith(Result.success(Unit))
// вторая дочерняя корутина
val childCoroutine2 = StandaloneCoroutine(...)
val uiScopeChild1Block = UiScopeChild1Block(
completion = childCoroutine2
)
DispatchedContinuation(
dispatcher = Dispachers.Main
continuation = uiScopeChild1Block
).resumeCancellableWith(Result.success(Unit))
I will not describe individual steps, as it was in the previous section, you can easily handle this yourself, besides, it will be good practice for consolidating knowledge, in short, the main point:
Parental coroutine
StandaloneCoroutine
will not complete until all of its child coroutines have completed.
When a new coroutine is created, its objectJob
added as a child element to the objectJob
parent coroutine, thanks to which coroutines can track the states of their children.suspend child coroutine blocks
UiScopeChild1Block
AndUiScopeChild2Block
will be wrapped inDispatchedContinuation
and switched to the main stream viaHandler.post()
regardless of whether the parent coroutine was initially on the main thread or not,Dispatchers.Main
UnlikeDispatchers.Main.immediate
always makes a switch.An object
Continuation
the parent coroutine is not connected in any way withContinuation
objects of the child coroutines, so when the latter completes the result will not be passed back toUiScopeParentBlock
and there is no particular sense in this, as with, for example,withContext()
which guarantees a sequential order of execution with the return of the result.Dispatchers in principle cannot guarantee the order of execution, since they execute code asynchronously, the same method
Handler.post()
from Android does not give 100% confidence that the code will always be executed in the order we planned.
We've generally figured out how to launch child coroutines through dispatchers, but what happens if, for example, we have a dispatcher Dispatchers.Main.immediate
and all coroutines are executed on the main thread, child coroutines will not be switched again through Handler.post()
how it was with Dispatchers.Main
in this case the so-called starts to work EventLoop
:
// метод из DispatchedContinuation
internal inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
// ...
} else {
/*
executeUnconfined делает одну из двух вещей:
1) выполняет лямбду на EventLoop'е
2) ставит лямбду в очередь EventLoop'а
*/
executeUnconfined(state, MODE_CANCELLABLE) {
continuation.resumeWith(result)
}
}
}
private inline fun DispatchedContinuation<*>.executeUnconfined(
contState: Any?, mode: Int, doYield: Boolean = false,
block: () -> Unit
): Boolean {
// EventLoop - штука куда кладутся лямбды в очередь на исполнение
val eventLoop = ThreadLocalEventLoop.eventLoop
// isUnconfinedLoopActive изначательно равен false поэтому для
// родительской корутины срабатывает вторая ветка, а для дочерних - первая
return if (eventLoop.isUnconfinedLoopActive) {
_state = contState
resumeMode = mode
// выполнение дочерних корутин ставится в очередь EventLoop'а
eventLoop.dispatchUnconfined(this)
true
} else {
/*
выполняет Continuation.resumeWith() для родительской корутины,
дочерние в этот момент создаются и кладутся в очередь EventLoop'а,
после завершения инициализация родительской корутины дочерние
по очереди берутся из EventLoop'а и выполняются
*/
runUnconfinedEventLoop(eventLoop, block = block)
false
}
}
Consider that EventLoop
this is a simple task queue or lambda as in our case, where requests for execution of child coroutines are placed, then they are executed in the order in which they were added to the queue, just such an organization ensures sequential execution of coroutines:
/*
viewModelScope под капотом использует Dispatchers.Main.immediate,
который не будет переключать дочерние корутины, так как они и так
находятся на главном потоке, поэтому будет задействован
механизм EventLoop'а
*/
viewModelScope.launch {
launch {
println("I'm the second!")
}
launch {
println("I'm the third!")
}
/*
весь блок кода в родительской корутине выполняется
до момента выполнения дочерних корутин, это необходимо
чтобы поставить дочерние корутины в очередь,
а потом начать их выполнять
*/
println("I'm the first!")
}
As an addition, I will give a couple more interesting features EventLoop
'A:
EventLoop
inherited fromCoroutineDispatcher
and can be used in coroutines, for example this worksrunBlocking()
EventLoop
is created for each thread through the mechanismThreadLocal
variables, such as instanceLooper
class from AndroidUnder the hood
EventLoop
liesArrayDeque
from Kotlin collections to form a task queue.
Finally, let's look at how runBlocking()
waits for its coroutines to complete and respects their order, although the answer is obvious – it is used EventLoop
:
/*
EventLoop создаётся на основе переданного CoroutineContext'а и кладётся
в специальную корутину BlockingCoroutine, затем вызывается
метод joinBlocking(), который ждёт пока все дочерние корутины
не выполнятся
*/
actual fun <T> runBlocking(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
val currentThread = Thread.currentThread()
val eventLoop: EventLoop = ...
val newContext: CoroutineContext = ...
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
private class BlockingCoroutine<T>(
parentContext: CoroutineContext,
private val blockedThread: Thread,
private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true, true) {
/*
ожидание происходит в while(true) цикле, а любой бесконечный цикл
как вы уже догадываетесь блокирует текущий поток, отсюда
и название runBlocking
чтобы завершить цикл ожидания нужно поменять isCompleted на false,
это произойдет только когда все дочерние корутины завершатся
*/
fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
if (isCompleted) break
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
}
It is precisely because of the infinite cycle runBlocking()
waits for all child coroutines to complete, while blocking the current thread.
I would like to add that the order in runBlocking()
will only be guaranteed when coroutines are executed without any asynchronous calls or switches to other dispatchers, for example here the order will not be respected:
fun main() = runBlocking<Unit> {
launch {
val result = withContext(Dispatchers.IO) {
delay(500)
"I'm the second!"
}
println(result)
}
launch {
println("I'm the first!")
}
}
Conclusion
I didn’t think that I would make it to this point myself, the article turned out to be very voluminous and without a drop of self-confidence I declare that it was incredibly useful!
As a final word, I have collected a couple of facts:
Coroutine is just a convenient abstraction with cool features over asynchronous code execution, an example of asynchronous execution can be switching to another thread, using the main thread queue (MessageQueue from Android), etc.
Continuation
– the building block on which almost the entire coroutine library is built is the simplest interface with a single methodresumeWith()
this method is called when the transition between the suspended state of the coroutine and its running state occurs.Suspended state – since coroutines allow you to write asynchronous code in a sequential style, a mechanism for returning to the points of execution of this code is needed, in most cases such a mechanism is implemented using callbacks, which are exactly what
Continuation
implementation.To the implementations
Continuation
interface include: regular coroutineStandaloneCoroutine
generated suspend block based onContinuationImpl
implementation for dispatchersDispatchedContinuation
coroutine used inrunBlocking()
method –BlockingCoroutine
and others.DispatchedContinuation
wraps up othersContinuation
objects to pass execution toContinuation.resumeWith()
method to the dispatcher.CoroutineDispatcher
in most cases it is used to switch a coroutine to another or other threads, but there are exceptions, such asEventLoop
for example, which allows you to execute coroutines in the correct order.EventLoop
– this is a simple task queue where requests for execution of coroutines are placed, and then they are executed in the order in which they were added to the queue, such an organization ensures the correct order of execution, but this only works if there are no switches through dispatchers in any coroutine.
Useful links:
Write your opinion in the comments and good code to everyone!