RxPY is a library that implements the principles of functional reactive programming in Python. It allows you to create and manage asynchronous data streams, combining them, filtering and transforming them using chains of operators. Main components of RxPY:

  • Observable: A data source that can emit events.

  • Observer: a subscriber that reacts to events from the Observable.

  • Operators: Functions that allow you to transform, filter, and combine an Observable.

Installing RxPY couldn't be easier:

pip install reactivex

In version 4.0.4 RxPY has undergone significant changes:

  • Renaming a module: now let's import reactivexnot rx.

  • Improved typing: Added type annotations for better IDE support.

  • Updated work with operators: method usage pipe for a chain of operators.

If you worked with RxPY v3, here's how to integrate the whole thing:

  • Import changes: import rx -> import reactivex as rx.

  • Operators: instead of observable.map() now we use observable.pipe(ops.map()).

  • Removed obsolete functions: Some old operators and methods have been removed or renamed.

Creating an Observable and Working with Operators

Creating an Observable


Creates an Observable that produces a single value.

import reactivex as rx

observable = rx.just(42)
observable.subscribe(lambda x: print(f"Значение: {x}"))


Converts an iterable object to an Observable.

observable = rx.from_([1, 2, 3, 4, 5])
observable.subscribe(lambda x: print(f"Элемент: {x}"))


Returns a sequence of numbers at a specified time interval.

import time
from reactivex import interval

observable = interval(1)  # каждую секунду
subscription = observable.subscribe(lambda x: print(f"Тик: {x}"))



Returns a value after a specified delay.

from reactivex import timer

observable = timer(3)  # через 3 секунды
observable.subscribe(lambda x: print("Таймер сработал!"))

Observable Transformation


Applies a function to each element.

from reactivex import operators as ops

    ops.map(lambda x: x * x)
).subscribe(lambda x: print(f"Квадрат: {x}"))


Expands nested Observables.

def duplicate(x):
    return rx.from_([x, x*2, x*3])

).subscribe(lambda x: print(f"Значение: {x}"))


Analogue reducebut produces the accumulated result at each iteration.

    ops.scan(lambda acc, x: acc + x, seed=0)
).subscribe(lambda x: print(f"Сумма: {x}"))

Data filtering


Selects elements that satisfy a condition.

    ops.filter(lambda x: x % 2 == 0)
).subscribe(lambda x: print(f"Четное число: {x}"))


Ignores values ​​if they arrive too quickly.

).subscribe(lambda x: print(f"Получено: {x}"))


Passes only unique values.

).subscribe(lambda x: print(f"Уникальное значение: {x}"))

Combining Observables


Combines multiple Observables into one thread.

obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_([4, 5, 6])

rx.merge(obs1, obs2).subscribe(lambda x: print(f"Элемент: {x}"))


Combines the elements of multiple Observables into tuples.

obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_(['a', 'b', 'c'])

rx.zip(obs1, obs2).subscribe(lambda x: print(f"Сочетание: {x}"))


Returns a combination of the last elements from each Observable.

obs1 = rx.interval(1)
obs2 = rx.interval(1.5)

rx.combine_latest(obs1, obs2).subscribe(lambda x: print(f"Комбинация: {x}"))

Testing Data Flows

There are hot and cold observables

  • Cold Observable begin to provide data from the moment of subscription.

  • Hot Observable already generate data, regardless of subscribers.

Cold Observable example:

def create_cold_observable(scheduler):
    return rx.from_([1, 2, 3], scheduler=scheduler)

scheduler = reactivex.testing.TestScheduler()
observable = create_cold_observable(scheduler)

Example of a hot Observable:

def create_hot_observable(scheduler):
    return scheduler.create_hot_observable(
        reactivex.testing.ReactiveTest.on_next(150, 1),
        reactivex.testing.ReactiveTest.on_next(210, 2),

scheduler = reactivex.testing.TestScheduler()
observable = create_hot_observable(scheduler)

Using TestScheduler:

from reactivex.testing import TestScheduler, ReactiveTest

def test_map_operator():
    scheduler = TestScheduler()
    xs = scheduler.create_hot_observable(
        ReactiveTest.on_next(150, 1),
        ReactiveTest.on_next(210, 2),

    def create():
        return xs.pipe(ops.map(lambda x: x * 10))

    results = scheduler.start(create)
    assert results.messages == [
        ReactiveTest.on_next(210, 20),

Testing with Marbles

Marble diagrams allow you to visualize data flows.

from reactivex.testing import marbles_testing

def test_filter_operator():
    with marbles_testing() as (start, cold, hot, exp):
        source = cold('--1-2-3-4-5-|')
        expected = exp('----2---4---|')

        result = start(source.pipe(
            ops.filter(lambda x: int(x) % 2 == 0)

        assert result == expected

A couple of examples of using RxPY

asyncio integration

RxPY goes well with asyncio:

import asyncio

async def main():
    loop = asyncio.get_event_loop()
    observable = rx.interval(1).pipe(

        on_next=lambda x: print(f"Tick: {x}"),
        on_error=lambda e: print(f"Error: {e}"),
        on_completed=lambda: print("Completed"),

    await asyncio.sleep(6)


RxPY can also help when working with message queues and caches in Redis:

import redis
from reactivex import Subject

r = redis.Redis()

def listen_to_channel(channel):
    pubsub = r.pubsub()
    for message in pubsub.listen():
        if message['type'] == 'message':
            yield message['data']

channel_observable = rx.from_(listen_to_channel('my_channel'))

channel_observable.subscribe(lambda msg: print(f"Received: {msg}"))

Event processing in event-driven architecture:

event_subject = Subject()

def handle_event(event):
    print(f"Handling event: {event}")

    ops.filter(lambda e: e['type'] == 'click'),
    ops.map(lambda e: e['payload'])

# Где-то в коде
event_subject.on_next({'type': 'click', 'payload': {'x': 100, 'y': 200}})
event_subject.on_next({'type': 'hover', 'payload': {'x': 150, 'y': 250}})


RxPY is a great find for those who want to manage asynchronous data flows in a new way. My recommendation: use RxPY if you work with large volumes of asynchronous data or build event-driven systems. In such projects she will reveal her full potential.

RxPY also has its own entry threshold. If you need to solve a simple problem with a minimum level of asynchrony, perhaps the default libraries asyncio or threads will be easier and faster to learn. But when it comes to complex and dynamic systems, RxPY may already be needed.

You can learn more from RxPY check out their git.

