Solving the race condition problem in Kotlin coroutine

Coroutine synchronization in Android is of utmost importance for the safety and efficiency of multithreading. Corroutines make it easier to manage asynchronous tasks, but without proper synchronization, problems such as race conditions can arise, causing the application to behave incorrectly.

Let's imagine that we need to change a variable from several coroutines in the following code:

var counter = 0

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
                counter++
        }
    }
}

fun main() = runBlocking {
    List(2) {
        launch { increment() }
    }.onEach { it.join() }
    println(counter)
}

In this code snippet, we declare a global variable counter and launch 2000 coroutines to increment counter. We expect the output to be 2000. But in reality the result will be completely different – from 1000 to 2000. This code is a simple and obvious example of a race condition. Race condition occurs when multiple coroutines try to access or modify shared data at the same time, and this can lead to incorrect and unpredictable results.

There are 6 ways to solve the race condition problem (at least that I know of).

Mutex

Target Mutex – mutual exclusion (similar to locks in multithreading) to prevent multiple coroutines from accessing shared resources at the same time. Mutex (mutual exinclusion lock ensures that only one coroutine can hold a lock at a time. Coroutines that attempt to acquire a locked Mutex will be suspended until it is released.

var counter = 0
val mutex = Mutex()


suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            mutex.withLock {
                counter++
            }
        }
    }
}

fun main() = runBlocking {
    List(2) {
        launch { increment() }
    }.onEach { it.join() }
    println(counter)
}

Example use: Protecting shared mutable state to prevent race condition.

Atomic operations

Atomic variables allow changes to be made safely without the need for explicit locks. Using classes like AtomicInteger or AtomicReference, you can safely update values ​​from multiple coroutines in a lock-free, thread-safe manner.

var counter = AtomicInteger(0)

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            counter.incrementAndGet()
        }
    }
}

fun main() = runBlocking {
    List(2) {
        launch { increment() }
    }.onEach { it.join() }
    println(counter)
}

Use case: Lockless synchronization for lightweight shared state updates.

Channels

Channels are used to send and receive data between coroutines in a thread-safe manner, often for producer-consumer patterns. Channel allows coroutines to send data to each other asynchronously. The receiving coroutine pauses until data is available.

var counter = 0

suspend fun increment(channel: Channel<Int>) {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            channel.send(1) // Send increment request to the channel
        }
    }
}

fun main() = runBlocking {
    val channel = Channel<Int>()

    val receiver = launch {
        for (value in channel) {
            counter += value
        }
    }

    List(2) {
        launch { increment(channel) }
    }.onEach { it.join() }

    channel.close()
    receiver.join()
    println(counter)
}

Use case: Synchronizing data flow between coroutines (eg producer-consumer).

Actors

Actors provide a coroutine-based way to implement the actor-a model for concurrency, where coroutines communicate by sending messages. Actor is a coroutine that processes messages one at a time, ensuring internal state is updated consistently, avoiding the need for locks.

sealed class CounterMsg
object Increment : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor(): SendChannel<CounterMsg> = actor<CounterMsg> {
    var counter = 0
    val receiver = launch {
        for (msg in channel) {
            when (msg) {
                is Increment -> counter++
                is GetCounter -> msg.response.complete(counter)
            }
        }
    }
    receiver.join()
}

suspend fun increment(channel: SendChannel<CounterMsg>) {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            channel.send(Increment)
        }
    }
}

fun main() = runBlocking {
    val counterActor = counterActor()

    List(2) {
        launch { increment(counterActor) }
    }.onEach { it.join() }

    val response = CompletableDeferred<Int>()
    counterActor.send(GetCounter(response))
    counterActor.close()
    val result = response.await()
    println(result)
}

Use case: State management using message passing.

Semaphore

Semaphores are used to limit the number of coroutines that can simultaneously access a shared resource. Semaphore supports a certain number of permissions, and coroutines can acquire or release them. Coroutines that try to obtain permissions when there are none will be suspended.

var counter = 0
val semaphore = Semaphore(1)

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            semaphore.withPermit {
                counter++
            }
        }
    }
}

fun main() = runBlocking {
    List(2) {
        launch { increment() }
    }.onEach { it.join() }
    println(counter)
}

Use case: limiting the number of simultaneous accesses to a resource (for example, database connections, network requests).

SharedFlow or StateFlow

SharedFlow and StateFlow are also used to share state and pass updates between multiple coroutines.

val counterFlow = MutableSharedFlow<Int>()

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            counterFlow.emit(1)
        }
    }
}

fun main() = runBlocking {
    var counter = 0

    val collectorJob = launch {
        counterFlow.collect { value ->
            counter += value
        }
    }

    List(2) {
        launch { increment() }
    }.onEach { it.join() }

    collectorJob.cancel()

    println(counter)
}

Conclusion:

Kotlin provides several ways to synchronize coroutines depending on the use case, including Mutex for mutual exclusion, Channels for secure communication, Semaphore for limiting concurrency, and more. These tools ensure that coroutines can work together without causing race conditions or inconsistent states in parallel programs.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *