Kotlin Coroutines. Part 2

It's been so long since the last article, I apologize for keeping you waiting. The final year took too much time and there was no opportunity to write such a serious material.

So, let's begin. First, I'll briefly talk about the material in this article. To warm up, let's talk about the scope and what it's for, then we'll move on to the context, where I'll try to explain the complex in simple terms, and finally, we'll touch on Continuation.

1. Coroutine scope

In this part I would like to reveal the answer to the question from the comments:

«Can you please tell me why the scope even exists? I mean, I literally couldn't find any functionality in the scope itself. So why isn't all this implemented directly in the context?

As is customary, let's approach the answer from afar. Let me remind you that Coroutine scope is an interface that provides a way to manage the life cycle of coroutines. It defines a set of methods for starting new coroutines and canceling existing ones.

First of all, it is worth saying that each coroutine builder is an extension of the CoroutineScope class, this is easy to verify. Let's open the implementation of any builder, let's say launch.

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

So, CoroutineScope is responsible for managing the life cycle of coroutines. And Context, roughly speaking, stores parameters for the coroutine's operation. The context has a Job object, which is also responsible for managing the life cycle, but only for one coroutine. Looking ahead, Jobs refer to each other when they are nested inside each other and the parent waits for all child coroutines to complete. Agree, confusion begins. If both are about the life cycle, why do we need this scope? Let's throw it out, and that's the end of it.

Of course, it's not that simple. CoroutineScope is a high-level abstraction that groups several coroutines together, providing structured management of their life cycle and execution context. That is, we are talking about a group of coroutines here. Without a scope, we cannot guarantee that all coroutines will complete their work before moving on to the next step.

You might say, “How can we not? We just need to add runBlocking and that's it!” But no. There is a very important feature that we need to remember. runBlocking can complete without waiting for all nested coroutines to complete if they were not explicitly started in its context.

fun main() {
    runBlocking {
        // Запускаем корутины
        launch {
            println("Coroutine 1 start!")
            // Вложенная корутина, запущенная в глобальном контексте
            GlobalScope.launch {
                delay(2000L)
                println("Nested coroutine 2 in global context")
            }
            println("Coroutine 1 completed")
        }

        println("RunBlocking scope completed")
    }
println("Bye Bye")
}

And this is the console output. It is clearly visible that the nested coroutine has not finished its work, but the program has already completed its:

RunBlocking scope completed

Coroutine 1 start!

Coroutine 1 completed

Bye Bye

We can declare our own scope using builder coroutinescopeThe provided scope inherits its coroutineContext from the outer scope, using the Job from that context as the parent for the new Job.

This function is designed to partition work in parallel. When any child coroutine in this region terminates with an error, the entire region also terminates, canceling all other child coroutines. Once this block and all its child coroutines have terminated, the function returns.

Both runBlocking and coroutineScope wait for all operations inside their block and all running child coroutines to complete. The difference between them is that runBlocking blocks the current thread, while coroutineScope only suspends work, allowing the main thread to be used for other tasks. Therefore, runBlocking is a regular function, while coroutineScope is a suspending function.

So, CoroutineContext itself only provides settings and parameters for executing coroutines. It does not provide mechanisms for managing a group of coroutines. Whereas CoroutineScope introduces an abstraction that allows managing the lifecycle of a group of coroutines interacting with each other.

I hope I was able to answer the question. Now we move on to the context.

2. Dispatcher and CoroutineContext, what about what and for how much?

In the context of Kotlin coroutines, Dispatcher is responsible for determining which thread or threads a coroutine should execute in. Dispatchers are an integral part of the kotlinx.coroutines library and are used to manage the execution of coroutines.

There are several dispatchers provided by the library:

  • Dispatchers.Default: This dispatcher is designed to perform CPU-intensive operations and has a thread pool size equal to the number of cores on the machine your code is running on (but not less than two).

  • Dispatchers.IO: This dispatcher is designed to perform I/O intensive operations and has a larger thread pool than the default.

  • Dispatchers.Main: This dispatcher is designed to run coroutines on the main thread of an Android application.

You can also create your own custom dispatchers.

To use a dispatcher, you can specify it when starting the coroutine using the 'launch' or 'async' constructors.

Mini example:

val result = async(Dispatchers.IO) {
    // выполнение задачи в другом диспетчере
    fetchData()
}.await()

2.1 Let's talk about CoroutineContext

CoroutineContext is, like scope, an interface that provides a set of elements necessary for executing coroutines. It is worth understanding that this is an important element, because a coroutine is always executed in a certain context, where the “rules” of operation are defined. Rules are a rather strange definition, it is introduced for ease of understanding.

First of all, let's open the interface itself CoroutineContext. It will have another nested interface Element, and it is very interesting to us. As already said, the context stores a set of parameters, and each of them can be obtained using the get method.

As the official documentation says: It is an indexed set of [Element] instances. An indexed set is a mix between a set and a map. Every element in this set is unique [Key]. Therefore, this is a kind of mix. Of course, some confusion is created, but that's how it is. In this case, we refer to the element, as in the map.

Based on this, we will provide a list of some elements that are implemented from CoroutineContext.Element: Job, ContinuationInterceptor, CoroutineExceptionHandler, CoroutineName, CoroutineDispatcher and others that define the behavior of the coroutine. We are already familiar with some elements from the first part, let's see what new inputs are available.

ContinuationInterceptor — monitors the execution of functions in the coroutine and manages them. Let's move on to a simple analogy. Let's imagine a subway train, it runs according to its schedule, stops at each station, performs some function (open, close doors, change the driver, say hello to someone – it doesn't matter), then continues on its way. It is ContinuationInterceptor that controls when the coroutine should stop and when it will continue its work.

CoroutineExceptionHandler. When an exception occurs in a coroutine that is not otherwise handled, the class catches the exception. It allows you to define specific behavior for handling errors in coroutines. Important: it only handles exceptions that are not otherwise handled in the coroutine.

When creating a coroutine using one of the coroutine constructors provided by the kotlinx.coroutines library, you can specify a CoroutineContext as the first parameter. This context will be used as the default context for the coroutine, and any elements in the context will be available to the coroutine during its execution.

launch(Dispatchers.Default + Job()) {
    println("Coroutine works in thread ${Thread.currentThread().name}")
}

2.2 Let's talk about creating context

The CoroutineContext interface has the following method:

public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
        context.fold(this) { acc, element -> …

The plus method from the CoroutineContext interface in Kotlin is used to combine contexts into one. When you call the plus method on one context and pass another context as an argument, it creates a new context that combines the functionality of both contexts. Keep in mind that this plus is not associative. In other words, context1 + context2 is not the same as context2 + context1, since all keys from the left context will be replaced by keys from the right context. Of course, this does not matter when combining two different elements ( Dispatchers.Default + Job() ), but when combining multiple CoroutineContext.Element s, it becomes an important factor.

But inside the fold method is called, what is it for?

public fun <R> fold(initial: R, operation: (R, Element) -> R): R

If we need to do something for each element in the context, we can use the fold method, similar to the standard fold method for other collections.

There are several ways to create a coroutine context:

  1. Using predefined context elements:

Example of creating a context using predefined elements:

val context = Dispatchers.Default + Job() + CoroutineExceptionHandler { _, exception ->
    println("Coroutine exception: $exception")
}
  1. Creating a user context:

You can create a custom context by combining different context elements using the + operator.

Example of creating a user context:

val customContext = MyCustomDispatcher + Job() + MyCustomExceptionHandler()
  1. Using the withContext function:

The withContext function allows you to execute a block of code in a specific context.

Example of using withContext:

suspend fun fetchData() {
    val data = withContext(Dispatchers.IO) {
        // Код для загрузки данных в фоновом потоке
    }
}
  1. Inheritance from parent coroutine:

If a coroutine is created inside another coroutine, the new coroutine inherits the context of the parent.

Example:

val parentJob = Job()
GlobalScope.launch(parent = parentJob) {
    // Новая корутина наследует контекст от parentJob
}

2.3 Context association with ThreadLocal

Spring Security provides great convenience when developing secure web applications. However, it relies heavily on the SecurityContext stored in a ThreadLocal (inside the SecurityContextHolder class). When using Kotlin coroutines, there is an extra level of abstraction where you don't actually know (and don't want to know) which threads your code will be running on. A coroutine can run on different threads, and because of this, Spring Security Context can be lost. The main problem is that a coroutine can start on one Thread, but continue its execution on another, which is actually why CoroutineContext is needed (fair enough, this is an extremely rare case).

The default approach in Spring Security, which is to keep the security context thread-local, works well in traditional servlet applications where the request is handled entirely in one specific thread. Spring also provides additional support when using an asynchronous servlet or when creating your own threads or executors. If you are using Spring WebFlux (reactive Spring), Kotlin coroutines already work in conjunction with EnableReactiveMethodSecurity. Otherwise, when using coroutines in a “traditional” Spring MVC environment, a different approach is required.

Kotlin coroutines are not thread-specific and therefore do not work well with ThreadLocal variables by default.

More details here:

A small code example:

/**
 * Необходимо использовать с withContext(IO + SecurityCoroutineContext()) для потенциально блокирующих вызвовов в suspend-функциях,
 * до этого корутины нужно запускать с этим аргументом: runBlocking(SecurityCoroutineContext()) { //вызов suspend-функций }
 * https://blog.jdriven.com/2021/07/propagating-the-spring-securitycontext-to-your-kotlin-coroutines/
 */
class SecurityCoroutineContext(
    private val securityContext: SecurityContext = SecurityContextHolder.getContext()
) : ThreadContextElement<SecurityContext?> {

    companion object Key : CoroutineContext.Key<SecurityCoroutineContext>

    override val key: CoroutineContext.Key<SecurityCoroutineContext> get() = Key

    override fun updateThreadContext(context: CoroutineContext): SecurityContext? {
        val previousSecurityContext = SecurityContextHolder.getContext()
        SecurityContextHolder.setContext(securityContext)
        return previousSecurityContext.takeIf { it.authentication != null }
    }

    override fun restoreThreadContext(context: CoroutineContext, oldState: SecurityContext?) {
        if (oldState == null) {
            SecurityContextHolder.clearContext()
        } else {
            SecurityContextHolder.setContext(oldState)
        }
    }
}

3. Let's talk about Continuation

Now it's the turn of the hardest part (for me personally). As we already discussed in the first part, there is Continuation, such an interesting object.

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

It has implementations in the class BaseContinuationImpl:

internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable

BaseContinuationImpl is an abstract base class for all Continuation implementations. It defines common behavior and provides common functionality. It implements the resumeWith method, which resumes execution of the coroutine with the specified result. The resumeWith method always calls invokeSuspend() , an abstract method that is implemented in the coroutine body class that will be created at compile time.

The next class is ContinuationImpl, which inherits from BaseContinuationImpl. Its function is to create a DispatchedContinuation object using an interceptor, which is also a Continuation.

DispatchedContinuation represents a Continuation object from the coroutine body and contains a thread scheduler. Its function is to use the thread scheduler to schedule the execution of the main part of the coroutine to a specified thread.

Putting it all together:

  1. There is a CoroutineDispatcher. What is CoroutineDispatcher? CoroutineDispatcher is like a postman that delivers coroutines (tasks) to different threads.

  2. What is Continuation? Continuation is like a zip code that specifies where the coroutine should be delivered once completed.

  3. How it works? When you start a coroutine, the CoroutineDispatcher receives it and wraps it in a special DispatchedContinuation. This envelope contains both the coroutine itself and its Continuation.

  4. Why is it important? This allows coroutines to run concurrently on different threads, making your code more efficient.

Results

In this part I tried to explain various things in an accessible way: Coroutine scope, CoroutineContext, Continuation. To be honest, this was quite a difficult part for me, since I studied all the material in bursts.

By the way, I would like to know what problems you would solve (have solved) using coroutines for the speed comparison part in practice.

Thank you for reading, I look forward to your feedback!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *