The magic of Dispatchers and how to make your own Main

I think now there are no people left who are unfamiliar with coroutines in Kotlin. Magic tool, right? Even more magical about them, I find it possible to move the calculation to another thread:

fun main() = runBlocking {
	println("Hello from ${Thread.currentThread().name}")
  withContext(Dispatchers.Default) {
    println("Hello from ${Thread.currentThread().name}")
	}

	println("Welcome back to ${Thread.currentThread().name}")
}

// Результат:
// Hello from main
// Hello from DefaultDispatcher-worker-2
// Welcome back to main

Just one line and we get hello from another thread. How does this mechanism work? It’s actually painfully simple when you consider CoroutineDispatcheryou can see two important methods there:

public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)

The first is responsible for the need to call dispatchbut the second one is a little more interesting, it is responsible for executing the passed to block Runnable in another thread. It is worth noting that dispatch should guarantee the fulfillment block, otherwise you can say deadlock and coroutine never will not continue execution, so the method should not not delayed fulfill blockif necessary, continue execution on the current thread, it is better to return false from isDispatchNeeded.

Standard?

We have 4 standard dispaters that can be accessed from the class Dispatchers: Default, Unconfined, Main, IO.

unconfined: The simplest of them is Unconfinedwhich will not change the flow of execution and the code from the beginning of the article will no longer be so interesting:

runBlocking(Dispatchers.Unconfined) {
	println("Hello from ${Thread.currentThread().name}")
	withContext(Dispatchers.Default) {
    println("Hello from ${Thread.currentThread().name}")
	}

	println("Welcome back to ${Thread.currentThread().name}")
}

// Результат:
// Hello from main
// Hello from DefaultDispatcher-worker-2
// Welcome back to DefaultDispatcher-worker-2)

This is achieved through isDispatchNeeded which always returns false and does not allow calling dispatch(To be fair, it should be noted that dispatch can still be called from yield(), but that’s a completely different story).

Default,IO: These two are based on ExecutorCoroutineDispatcher with difficulties in the implementation of their Executor'аDefault inherits SchedulerCoroutineDispatcherwhich performs tasks by sending them to CoroutineSchedulerit’s custom Executorwith a thread pool equal to the number of processor threads (minimum 2) for CPU-bound tasks, which can expand up to maxPoolSizeequal to the system parameter kotlinx.coroutines.scheduler.max.pool.sizeor 2097150 maximum, for blocking tasks (IO). IO Dispatcher works through Default limiting itself to threads equal to the system parameter kotlinx.coroutines.io.parallelism or the number of processor threads (minimum 64)). CoroutineScheduler should understand the blocking task or not, and this is implemented by the method dispatchWithContext at SchedulerCoroutineDispatcher, where the task type is explicitly specified: BlockingContext from IO and non-blocking for any task from Default.

Main: The thing that started it all. The coroutines themselves(coroutine-core) do not provide an implementation MainCoroutineDispatcher, but only the mechanism for loading it. Loading class MainDispatcherLoaderwhich uses ServiceLoader and FastServiceLoader(Only used for android)which explicitly tries to initialize kotlinx.coroutines.android.AndroidDispatcherFactory. If a MainDispatcherLoader does not find implementations MainDispatcherFactory or createDispatcher will throw an exception, will create a standard MissingMainCoroutineDispatcher, throwing exceptions to everything.

Consider the implementation in Android:

AT Android MainCoroutineDispatcher implemented on the basis handler, is engaged in initialization AndroidDispatcherFactory:

override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
	val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
	return HandlerContext(mainLooper.asHandler(async = true))
}

@VisibleForTesting
internal fun Looper.asHandler(async: Boolean): Handler {
	// Async support was added in API 16.
	if (!async || Build.VERSION.SDK_INT < 16) {
		return Handler(this)
	}
	if (Build.VERSION.SDK_INT &gt;= 28) {
    // TODO compile against API 28 so this can be invoked without reflection.
    val factoryMethod = Handler::class.java.getDeclaredMethod("createAsync", Looper::class.java)
    return factoryMethod.invoke(null, this) as Handler
	}

	val constructor: Constructor&lt;Handler&gt;
	try {
    constructor = Handler::class.java.getDeclaredConstructor(Looper::class.java,
        Handler.Callback::class.java, Boolean::class.javaPrimitiveType)
	} catch (ignored: NoSuchMethodException) {
    // Hidden constructor absent. Fall back to non-async constructor.
    return Handler(this)
	}
	return constructor.newInstance(this, null, true)
}

Myself HandlerContext implements MainCoroutineDispatcher With Delay and pushes execution to the main thread using Handler::post:

override fun dispatch(context: CoroutineContext, block: Runnable) {
	if (!handler.post(block)) {
		cancelOnRejection(context, block)
	}
}

Delay is also needed to redefine the mechanism of the function delay()which runs on a dedicated thread by default, on Android will this work through handler.postDelayed. You can also see the implementation here. isDispatchNeededwhich for MainCoroutineDispatcher.immediate won’t call dispatch provided you are already on the main thread.

Custom implementation of MainCoroutineDispatcher: It became interesting to me how to drag coroutines into an existing Java project with an already implemented Event-Loop. Fortunately, I have a game server for experiments written entirely in Java, running in several threads with an Event-Loop on the main one. Start with implementation MainCoroutineDispatcher:

internal class ServerDispatcher(
	private val invokeImmediately: Boolean
) : MainCoroutineDispatcher() {
	@Volatile
	private var _immediate = if (invokeImmediately) this else null

	override val immediate = _immediate ?: ServerDispatcher(true).also { _immediate = it }

	override fun isDispatchNeeded(context: CoroutineContext): Boolean =
    !invokeImmediately || !Server.getInstance().isPrimaryThread

	override fun dispatch(context: CoroutineContext, block: Runnable) {
    Server.getInstance().scheduler.scheduleTask(block)
	}

	override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
    throw UnsupportedOperationException("limitedParallelism is not supported for ${this::class.qualifiedName}")
	}
}

Here isDispatchNeeded no different from that in android, difference here in dispatchwho puts Runnable to the queue of tasks that are parsed and executed in a loop on the main thread. Let’s see how it works:

val scope = CoroutineScope(ServerDispatcher(invokeImmediately = false) + SupervisorJob())
scope.launch {
	logger.info("First message from coroutine!!")
	delay(3000)
	logger.info("Message from coroutine after 3000ms delay on ${Thread.currentThread().name} thread!")
	withContext(Dispatchers.IO) {
    logger.info("Message from other context: ${Thread.currentThread().name}")
	}

	logger.info("You again on ${Thread.currentThread().name} thread!!!")
}

// Результат:
// 16:04:55 [INFO ] First message from coroutine!!
// 16:04:58 [INFO ] Message from coroutine after 3000ms delay on main thread!
// 16:04:58 [INFO ] Message from other context: DefaultDispatcher-worker-1
// 16:04:58 [INFO ] You again on main thread!!!

We made sure that everything works, now it’s time to make the download. We create a factory:

class ServerDispatcherFactory : MainDispatcherFactory {
	override val loadPriority: Int = Int.MAX_VALUE
	override fun createDispatcher(
    allFactories: List<MainDispatcherFactory>,
  ): MainCoroutineDispatcher = ServerDispatcher(invokeImmediately = false)
}

Go to resources and put the file kotlinx.coroutines.internal.MainDispatcherFactory in META-INF/services with content:

dev.israpil.coroutines.mygameserver.ServerDispatcherFactory

We check:

val scope = CoroutineScope(Dispatchers.Main.immediate + SupervisorJob())
scope.launch {
	logger.info("First message from coroutine!!")
	delay(3000)
	logger.info("Message from coroutine after 3000ms delay on ${Thread.currentThread().name} thread!")
	withContext(Dispatchers.IO) {
    logger.info("Message from other context: ${Thread.currentThread().name}")
	}

	logger.info("You again on ${Thread.currentThread().name} thread!!!")
}

// Результат:
// 16:04:55 [INFO ] First message from coroutine!!
// 16:04:58 [INFO ] Message from coroutine after 3000ms delay on main thread!
// 16:04:58 [INFO ] Message from other context: DefaultDispatcher-worker-1
// 16:04:58 [INFO ] You again on main thread!!!

Enjoy coroutines.

Similar Posts

Leave a Reply