Comparison of RxJava 3 and Kotlin Coroutines Flow Operators

In this article we will consider only the most popular operators and code examples:

Conversion Operators

RxJava map / Flow map:

Transforms elements by applying a function to each of them. Works consistently.

Flow map

Flow map

Example code
//RxJava map
fun main() {
  Observable.range(1, 5)
    .map { it * 2 }
    .subscribe(::println) // Вывод: 2, 4, 6, 8, 10
}
//Flow map
fun main() = runBlocking {
  (1..5).asFlow()
    .map { it * 2 }
    .collect { println(it) } // Вывод: 2, 4, 6, 8, 10
}

RxJava flatMap / Flow flatMapMerge:

Converts each value to a stream. RxJava flatMap /Flow flatMapMerge unlike RxJava concatMap /Flow flatMapConcat processes elements of the source stream in parallel rather than sequentially. Threads for each element run simultaneously and emit values ​​in the order they are ready, rather than one at a time, so the order of the values ​​may differ and may not match the original one. Should be used when processing order is not critical and performance benefits can be gained from parallel processing.

Flow flatMapMerge

Flow flatMapMerge

Example code
//RxJava flatMap
fun main() {
    Observable.just(1, 2, 3)
        .flatMap { i ->
            Observable.just(i * 10)
                .concatWith(Observable.just(i * 20).delay(10, TimeUnit.MILLISECONDS))
        }
        .blockingSubscribe(::println)
}
//Flow flatMapMerge
fun main() = runBlocking {
  flowOf(1, 2, 3)
    .flatMapMerge { flow { emit(it * 10); delay(10); emit(it * 20) } }
    .collect { println(it) }
}

RxJava concatMap / Flow flatMapConcat:

Converts each value into a stream and concatenates them sequentially. Takes each element from the original stream and, for each element, creates a new stream. These streams are then combined into one linear stream. It is important to note that the merging occurs one at a time: the next thread begins only after the previous one has completed.

Flow flatMapConcat

Flow flatMapConcat

Example code
//RxJava concatMap
fun main() {
  Observable.just(1, 2, 3)
    .concatMap { i -> Observable.just(i * 10, i * 20) }
    .subscribe(::println) // Вывод: 10, 20, 20, 40, 30, 60
}
//Flow flatMapConcat
fun main() = runBlocking {
  flowOf(1, 2, 3)
    .flatMapConcat { flowOf(it * 10, it * 20) }
    .collect { println(it) } // Вывод: 10, 20, 20, 40, 30, 60
}

RxJava buffer / Flow buffer:

Groups elements into fixed-size lists.

Used to process elements in buffers (or batches), which improves performance, especially in cases where processing each element individually blocks unnecessary resources.

Example code
//RxJava buffer
fun main() {
  Observable.just(1, 2, 3, 4, 5)
    .doOnNext { println("Emitting $it") }
    .buffer(2) // Буферизуем элементы по 2
    .subscribe { buffer ->
      println("Collected $buffer")
    }
  Thread.sleep(1000)
}
//Flow buffer
fun main() = runBlocking {
  flowOf(1, 2, 3, 4, 5)
    .onEach { println("Emitting $it") }
    .buffer() // Буферизуем все элементы
    .collect { value ->
      println("Collected $value")
    }
}

RxJava scan / Flow scan:

Applies a function to elements sequentially, returning intermediate results. Used to accumulate values ​​with the ability to take into account previous accumulated values.

Flow scan

Flow scan

Example code
//RxJava scan
fun main() {
    Observable.fromArray(1, 2, 3)
        .scan(0) { accumulator, value ->
            accumulator + value
        }
        .subscribe { result ->
            println(result) // Вывод: 0, 1, 3, 6
        }
}
//Flow scan
fun main() = runBlocking<Unit> {
    // Создаем Flow от 1 до 3
    val flow = (1..3).asFlow()

    // Применяем оператор scan
    flow.scan(0) { accumulator, value ->
        accumulator + value
    }.collect { result ->
        println(result) // Вывод: 0, 1, 3, 6
    }
}

RxJava groupBy / There is no analogue on Flow:

Splits a stream into several streams grouped by keys.

Example code
fun main() {
  val observable = Observable.just(
    "one", "two", "three", "four", "five", "six"
  )

  observable.groupBy { it.length }
    .flatMapSingle { group ->
      group.toList().map { list -> Pair(group.key, list) }
    }
    .subscribe { pair ->
      println("Length: ${pair.first}, Words: ${pair.second}")
    }
}

Filtration

RxJava distinctUntilChanged / Flow distinctUntilChanged

Eliminates duplicate sequential elements.

Flow distinctUntilChanged

Flow distinctUntilChanged

Example code
//RxJava distinctUntilChanged
fun main() {
  Observable.just(1, 1, 2, 2, 3, 1)
    .distinctUntilChanged()
    .subscribe(::println) // Вывод: 1, 2, 3, 1
}
//Flow distinctUntilChanged
fun main() = runBlocking {
  flowOf(1, 1, 2, 2, 3, 1)
    .distinctUntilChanged()
    .collect { println(it) } // Вывод: 1, 2, 3, 1
}

RxJava filter / Flow filter:

Selects elements that match a condition.

Flow filter

Flow filter

Example code
//RxJava filter
fun main() {
  Observable.range(1, 5)
    .filter { it % 2 == 0 }
    .subscribe(::println) // Вывод: 2, 4
}
//Flow filter
fun main() = runBlocking {
  (1..5).asFlow()
    .filter { it % 2 == 0 }
    .collect { println(it) } // Вывод: 2, 4
}

RxJava take / Flow take:

Takes the first N elements from the incoming stream.

Flow take

Flow take

Example code
//RxJava take
fun main() {
  Observable.range(1, 10)
    .take(3)
    .subscribe(::println) // Вывод: 1, 2, 3
}
//Flow take
fun main() = runBlocking {
  (1..10).asFlow()
    .take(3)
    .collect { println(it) } // Вывод: 1, 2, 3
}

RxJava skip/Flow drop:

Skips the first N elements.

Flow drop

Flow drop

Example code
//Flow drop
fun main() = runBlocking {
  flowOf(1, 2, 3, 4, 5)
    .drop(2) // Пропускаем первые 2 элемента
    .collect { value ->
      println(value) // Выведет 3, 4, 5
    }
}
//RxJava skip
fun main() {
  Observable.just(1, 2, 3, 4, 5)
    .skip(2) // Пропускаем первые 2 элемента
    .subscribe { value ->
      println(value) // Выведет 3, 4, 5
    }
}

Combination

RxJava merge / Flow merge:

Combines the specified threads into a single thread. All streams are merged simultaneously, with no limit on the number of streams that can be collected simultaneously.

Flow merge

Flow merge

Example code
//Flow merge
fun main() = runBlocking {
  val flow1 = flow {
    repeat(3) { 
      emit("From flow1: $it")
      delay(500L) // Эмиссия каждые 500 мс
    }
  }

  val flow2 = flow {
    repeat(3) { 
      emit("From flow2: $it")
      delay(1000L) // Эмиссия каждые 1000 мс
    }
  }

  val startTime = System.currentTimeMillis()
  merge(flow1, flow2).collect { value ->
    println("$value at ${System.currentTimeMillis() - startTime} ms")
  }
}
//RxJava merge
fun main() {
    val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
        .take(3)
        .map { "From observable1: $it" }

    val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
        .take(3)
        .map { "From observable2: $it" }

    val startTime = System.currentTimeMillis()
    Observable.merge(observable1, observable2)
        .subscribe { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms")
        }

    // Ожидание завершения потоков
    Thread.sleep(4000)
}

RxJava zip/Flow zip:

Combines streams by applying a combine function to elements. An element for which there is no pair is not emitted.

Flow zip

Flow zip

Example code
//RxJava zip
fun main() {
  val nums = Observable.range(1, 3)
  val letters = Observable.just("A", "B", "C", "D")
  
  Observable.zip(nums, letters) { n, l -> "$n -> $l" }
    .subscribe(::println) 
  // Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится
}
//Fllow zip
fun main() = runBlocking {
  val numbers = (1..3).asFlow()
  val letters = flowOf("A", "B", "C", "D")
  numbers.zip(letters) { n, l -> "$n -> $l" }
    .collect { println(it) } 
  // Вывод: 1 -> A, 2 -> B, 3 -> C //D - не эмиттится
}

RxJava combineLatest / Flow combine:

Merges streams using the last elements from each.

Flow combine

Flow combine

Example code
//Flow combine
fun main() = runBlocking {
    val flow1 = flow {
        repeat(3) {
            emit("String: s$it")
            delay(500L) // Эмиссия каждые 500 мс
        }
    }

    val flow2 = flow {
        repeat(3) {
            emit(it)
            delay(1000L) // Эмиссия каждые 1000 мс
        }
    }

    flow1.combine(flow2) { str, num ->
        "$str and Int: $num"
    }.collect { value ->
        println(value)
    }
}
//RxJava combineLatest
fun main() {
    val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
        .take(3)
        .map { "String: s$it" }

    val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
        .take(3)

    Observable.combineLatest(observable1, observable2) { str: String, num: Long ->
        "$str and Int: $num"
    }
        .subscribe { value ->
            println(value)
        }
    Thread.sleep(5000L)
}

Aggregation

RxJava reduce / Flow reduce:

In Kotlin Flow, the reduce operator is similar to the operator of the same name in RxJava. Both are used to convert a collection of values ​​into a single value using a seed and an aggregation (accumulator) function. Applies an aggregation function to all elements, returning a single value.

Example code
//Flow reduce
fun main() = runBlocking {
  val result = (1..5).asFlow()
    .reduce { accumulator, value ->
      accumulator + value
    }

  println("Flow Reduce Result: $result")
}
//RxJava reduce
fun main() {
    Observable.fromIterable(listOf(1, 2, 3, 4, 5))
        .reduce { accumulator, value ->
            accumulator + value
        }
        .subscribe { result ->
            println("RxJava Reduce Result: $result")
        }
}
  • count:

    Used to count the number of elements in a stream that satisfy a certain condition (or all elements if there is no condition)

RxJava count / Flow count:

Used to count the number of elements in a stream that satisfy a certain condition (or all elements if there is no condition)

Example code
//Flow count
fun main() = runBlocking {
    val count = (1..10).asFlow()
        .count { it % 2 == 0 } // Считаем количество четных чисел

    println("Flow Count Result: $count")
}
//RxJava count
fun main() {
    Observable.fromIterable(1..10)
        .filter { it % 2 == 0 } // Отфильтруем четные числа
        .count() // Подсчитываем количество элементов после фильтрации
        .subscribe { count ->
            println("RxJava Count Result: $count")
        }
}

Time management

RxJava debounce / Flow debounce:

Skips elements if they are generated too quickly.

Flow debounce

Flow debounce

Example code
//RxJava debounce
import io.reactivex.rxjava3.core.Observable
import java.util.concurrent.TimeUnit

fun main() {
  Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(10)
    .debounce(150, TimeUnit.MILLISECONDS) // Испускает только элементы, 
  //за которыми следует 150мс тишины
    .subscribe(::println)

  Thread.sleep(2000)
}
//Flow debounce
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
  (1..10).asFlow()
    .onEach { delay(100L) }
    .debounce(150L) // Испускает только элементы, 
  //за которыми следует 150мс тишины
    .collect { println(it) } 
}

RxJava delay / Flow – another approach

In RxJava Delays the emission of elements for the specified time. Flow uses delay from kotlinx.coroutines.delay inside the builder flow or inside statements.

Example code
//RxJava delay
fun main() {
  val startTime = System.currentTimeMillis()

  Observable.range(1, 5)
    .concatMap { item ->
      Observable.just(item)
        .delay(1000, TimeUnit.MILLISECONDS) // Задержка каждого элемента
    }
    .subscribe { value ->
      println("RxJava emitted $value at ${System.currentTimeMillis() - startTime} ms")
    }

  // Чтобы приложение не завершилось раньше времени
  Thread.sleep(6000)
}
//Flow: delay в onEach
fun main() = runBlocking {
    val startTime = System.currentTimeMillis()

    (1..5).asFlow()
        .onEach { delay(1000) } // Задержка перед каждым элементом
        .collect { value ->
            println("Flow emitted $value " +
                    "at ${System.currentTimeMillis() - startTime} ms")
        }
}

I hope the article gave you a more detailed understanding of how various operators are compared in RxJava and Kotlin Flow and will help you more quickly migrate from one to the other. Also in the article I tried to use many time diagrams to better understand the principles of the operator’s work.

Thank you for your attention!

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *