Comparison of RxJava 3 and Kotlin Coroutines Flow Operators
In this article we will consider only the most popular operators and code examples:
Conversion Operators
RxJava map / Flow map:
Transforms elements by applying a function to each of them. Works consistently.
Example code
//RxJava map
fun main() {
Observable.range(1, 5)
.map { it * 2 }
.subscribe(::println) // Вывод: 2, 4, 6, 8, 10
}
//Flow map
fun main() = runBlocking {
(1..5).asFlow()
.map { it * 2 }
.collect { println(it) } // Вывод: 2, 4, 6, 8, 10
}
RxJava flatMap / Flow flatMapMerge:
Converts each value to a stream. RxJava flatMap
/Flow flatMapMerge
unlike RxJava concatMap
/Flow flatMapConcat
processes elements of the source stream in parallel rather than sequentially. Threads for each element run simultaneously and emit values in the order they are ready, rather than one at a time, so the order of the values may differ and may not match the original one. Should be used when processing order is not critical and performance benefits can be gained from parallel processing.
Example code
//RxJava flatMap
fun main() {
Observable.just(1, 2, 3)
.flatMap { i ->
Observable.just(i * 10)
.concatWith(Observable.just(i * 20).delay(10, TimeUnit.MILLISECONDS))
}
.blockingSubscribe(::println)
}
//Flow flatMapMerge
fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapMerge { flow { emit(it * 10); delay(10); emit(it * 20) } }
.collect { println(it) }
}
RxJava concatMap / Flow flatMapConcat:
Converts each value into a stream and concatenates them sequentially. Takes each element from the original stream and, for each element, creates a new stream. These streams are then combined into one linear stream. It is important to note that the merging occurs one at a time: the next thread begins only after the previous one has completed.
Example code
//RxJava concatMap
fun main() {
Observable.just(1, 2, 3)
.concatMap { i -> Observable.just(i * 10, i * 20) }
.subscribe(::println) // Вывод: 10, 20, 20, 40, 30, 60
}
//Flow flatMapConcat
fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapConcat { flowOf(it * 10, it * 20) }
.collect { println(it) } // Вывод: 10, 20, 20, 40, 30, 60
}
RxJava buffer / Flow buffer:
Groups elements into fixed-size lists.
Used to process elements in buffers (or batches), which improves performance, especially in cases where processing each element individually blocks unnecessary resources.
Example code
//RxJava buffer
fun main() {
Observable.just(1, 2, 3, 4, 5)
.doOnNext { println("Emitting $it") }
.buffer(2) // Буферизуем элементы по 2
.subscribe { buffer ->
println("Collected $buffer")
}
Thread.sleep(1000)
}
//Flow buffer
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.onEach { println("Emitting $it") }
.buffer() // Буферизуем все элементы
.collect { value ->
println("Collected $value")
}
}
RxJava scan / Flow scan:
Applies a function to elements sequentially, returning intermediate results. Used to accumulate values with the ability to take into account previous accumulated values.
Example code
//RxJava scan
fun main() {
Observable.fromArray(1, 2, 3)
.scan(0) { accumulator, value ->
accumulator + value
}
.subscribe { result ->
println(result) // Вывод: 0, 1, 3, 6
}
}
//Flow scan
fun main() = runBlocking<Unit> {
// Создаем Flow от 1 до 3
val flow = (1..3).asFlow()
// Применяем оператор scan
flow.scan(0) { accumulator, value ->
accumulator + value
}.collect { result ->
println(result) // Вывод: 0, 1, 3, 6
}
}
RxJava groupBy / There is no analogue on Flow:
Splits a stream into several streams grouped by keys.
Example code
fun main() {
val observable = Observable.just(
"one", "two", "three", "four", "five", "six"
)
observable.groupBy { it.length }
.flatMapSingle { group ->
group.toList().map { list -> Pair(group.key, list) }
}
.subscribe { pair ->
println("Length: ${pair.first}, Words: ${pair.second}")
}
}
Filtration
RxJava distinctUntilChanged / Flow distinctUntilChanged
Eliminates duplicate sequential elements.
Example code
//RxJava distinctUntilChanged
fun main() {
Observable.just(1, 1, 2, 2, 3, 1)
.distinctUntilChanged()
.subscribe(::println) // Вывод: 1, 2, 3, 1
}
//Flow distinctUntilChanged
fun main() = runBlocking {
flowOf(1, 1, 2, 2, 3, 1)
.distinctUntilChanged()
.collect { println(it) } // Вывод: 1, 2, 3, 1
}
RxJava filter / Flow filter:
Selects elements that match a condition.
Example code
//RxJava filter
fun main() {
Observable.range(1, 5)
.filter { it % 2 == 0 }
.subscribe(::println) // Вывод: 2, 4
}
//Flow filter
fun main() = runBlocking {
(1..5).asFlow()
.filter { it % 2 == 0 }
.collect { println(it) } // Вывод: 2, 4
}
RxJava take / Flow take:
Takes the first N elements from the incoming stream.
Example code
//RxJava take
fun main() {
Observable.range(1, 10)
.take(3)
.subscribe(::println) // Вывод: 1, 2, 3
}
//Flow take
fun main() = runBlocking {
(1..10).asFlow()
.take(3)
.collect { println(it) } // Вывод: 1, 2, 3
}
RxJava skip/Flow drop:
Skips the first N elements.
Example code
//Flow drop
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.drop(2) // Пропускаем первые 2 элемента
.collect { value ->
println(value) // Выведет 3, 4, 5
}
}
//RxJava skip
fun main() {
Observable.just(1, 2, 3, 4, 5)
.skip(2) // Пропускаем первые 2 элемента
.subscribe { value ->
println(value) // Выведет 3, 4, 5
}
}
Combination
RxJava merge / Flow merge:
Combines the specified threads into a single thread. All streams are merged simultaneously, with no limit on the number of streams that can be collected simultaneously.
Example code
//Flow merge
fun main() = runBlocking {
val flow1 = flow {
repeat(3) {
emit("From flow1: $it")
delay(500L) // Эмиссия каждые 500 мс
}
}
val flow2 = flow {
repeat(3) {
emit("From flow2: $it")
delay(1000L) // Эмиссия каждые 1000 мс
}
}
val startTime = System.currentTimeMillis()
merge(flow1, flow2).collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms")
}
}
//RxJava merge
fun main() {
val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
.take(3)
.map { "From observable1: $it" }
val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(3)
.map { "From observable2: $it" }
val startTime = System.currentTimeMillis()
Observable.merge(observable1, observable2)
.subscribe { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms")
}
// Ожидание завершения потоков
Thread.sleep(4000)
}
RxJava zip/Flow zip:
Combines streams by applying a combine function to elements. An element for which there is no pair is not emitted.
Example code
//RxJava zip
fun main() {
val nums = Observable.range(1, 3)
val letters = Observable.just("A", "B", "C", "D")
Observable.zip(nums, letters) { n, l -> "$n -> $l" }
.subscribe(::println)
// Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится
}
//Fllow zip
fun main() = runBlocking {
val numbers = (1..3).asFlow()
val letters = flowOf("A", "B", "C", "D")
numbers.zip(letters) { n, l -> "$n -> $l" }
.collect { println(it) }
// Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится
}
RxJava combineLatest / Flow combine:
Merges streams using the last elements from each.
Example code
//Flow combine
fun main() = runBlocking {
val flow1 = flow {
repeat(3) {
emit("String: s$it")
delay(500L) // Эмиссия каждые 500 мс
}
}
val flow2 = flow {
repeat(3) {
emit(it)
delay(1000L) // Эмиссия каждые 1000 мс
}
}
flow1.combine(flow2) { str, num ->
"$str and Int: $num"
}.collect { value ->
println(value)
}
}
//RxJava combineLatest
fun main() {
val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
.take(3)
.map { "String: s$it" }
val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.take(3)
Observable.combineLatest(observable1, observable2) { str: String, num: Long ->
"$str and Int: $num"
}
.subscribe { value ->
println(value)
}
Thread.sleep(5000L)
}
Aggregation
RxJava reduce / Flow reduce:
In Kotlin Flow, the reduce operator is similar to the operator of the same name in RxJava. Both are used to convert a collection of values into a single value using a seed and an aggregation (accumulator) function. Applies an aggregation function to all elements, returning a single value.
Example code
//Flow reduce
fun main() = runBlocking {
val result = (1..5).asFlow()
.reduce { accumulator, value ->
accumulator + value
}
println("Flow Reduce Result: $result")
}
//RxJava reduce
fun main() {
Observable.fromIterable(listOf(1, 2, 3, 4, 5))
.reduce { accumulator, value ->
accumulator + value
}
.subscribe { result ->
println("RxJava Reduce Result: $result")
}
}
count:
Used to count the number of elements in a stream that satisfy a certain condition (or all elements if there is no condition)
RxJava count / Flow count:
Used to count the number of elements in a stream that satisfy a certain condition (or all elements if there is no condition)
Example code
//Flow count
fun main() = runBlocking {
val count = (1..10).asFlow()
.count { it % 2 == 0 } // Считаем количество четных чисел
println("Flow Count Result: $count")
}
//RxJava count
fun main() {
Observable.fromIterable(1..10)
.filter { it % 2 == 0 } // Отфильтруем четные числа
.count() // Подсчитываем количество элементов после фильтрации
.subscribe { count ->
println("RxJava Count Result: $count")
}
}
Time management
RxJava debounce / Flow debounce:
Skips elements if they are generated too quickly.
Example code
//RxJava debounce
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit
fun main() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10)
.debounce(150, TimeUnit.MILLISECONDS) // Испускает только элементы,
//за которыми следует 150мс тишины
.subscribe(::println)
Thread.sleep(2000)
}
//Flow debounce
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..10).asFlow()
.onEach { delay(100L) }
.debounce(150L) // Испускает только элементы,
//за которыми следует 150мс тишины
.collect { println(it) }
}
RxJava delay / Flow – another approach
In RxJava Delays the emission of elements for the specified time. Flow uses delay from kotlinx.coroutines.delay inside the builder flow
or inside statements.
Example code
//RxJava delay
fun main() {
val startTime = System.currentTimeMillis()
Observable.range(1, 5)
.concatMap { item ->
Observable.just(item)
.delay(1000, TimeUnit.MILLISECONDS) // Задержка каждого элемента
}
.subscribe { value ->
println("RxJava emitted $value at ${System.currentTimeMillis() - startTime} ms")
}
// Чтобы приложение не завершилось раньше времени
Thread.sleep(6000)
}
//Flow: delay в onEach
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
(1..5).asFlow()
.onEach { delay(1000) } // Задержка перед каждым элементом
.collect { value ->
println("Flow emitted $value " +
"at ${System.currentTimeMillis() - startTime} ms")
}
}
I hope the article gave you a more detailed understanding of how various operators are compared in RxJava and Kotlin Flow and will help you more quickly migrate from one to the other. Also in the article I tried to use many time diagrams to better understand the principles of the operator’s work.
Thank you for your attention!