RxPY library overview

RxPY Basics

RxPY is a library that implements the principles of functional reactive programming in Python. It allows you to create and manage asynchronous data streams, combining them, filtering and transforming them using chains of operators. Main components of RxPY:

  • Observable: A data source that can emit events.

  • Observer: a subscriber that reacts to events from the Observable.

  • Operators: Functions that allow you to transform, filter, and combine an Observable.

Installing RxPY couldn't be easier:

pip install reactivex

In version 4.0.4 RxPY has undergone significant changes:

  • Renaming a module: now let's import reactivexnot rx.

  • Improved typing: Added type annotations for better IDE support.

  • Updated work with operators: method usage pipe for a chain of operators.

If you worked with RxPY v3, here's how to integrate the whole thing:

  • Import changes: import rx -> import reactivex as rx.

  • Operators: instead of observable.map() now we use observable.pipe(ops.map()).

  • Removed obsolete functions: Some old operators and methods have been removed or renamed.

Creating an Observable and Working with Operators

Creating an Observable

just()

Creates an Observable that produces a single value.

import reactivex as rx

observable = rx.just(42)
observable.subscribe(lambda x: print(f"Значение: {x}"))

from_()

Converts an iterable object to an Observable.

observable = rx.from_([1, 2, 3, 4, 5])
observable.subscribe(lambda x: print(f"Элемент: {x}"))

interval()

Returns a sequence of numbers at a specified time interval.

import time
from reactivex import interval

observable = interval(1)  # каждую секунду
subscription = observable.subscribe(lambda x: print(f"Тик: {x}"))

time.sleep(5)
subscription.dispose()

timer()

Returns a value after a specified delay.

from reactivex import timer

observable = timer(3)  # через 3 секунды
observable.subscribe(lambda x: print("Таймер сработал!"))

Observable Transformation

map()

Applies a function to each element.

from reactivex import operators as ops

observable.pipe(
    ops.map(lambda x: x * x)
).subscribe(lambda x: print(f"Квадрат: {x}"))

flat_map()

Expands nested Observables.

def duplicate(x):
    return rx.from_([x, x*2, x*3])

observable.pipe(
    ops.flat_map(duplicate)
).subscribe(lambda x: print(f"Значение: {x}"))

scan()

Analogue reducebut produces the accumulated result at each iteration.

observable.pipe(
    ops.scan(lambda acc, x: acc + x, seed=0)
).subscribe(lambda x: print(f"Сумма: {x}"))

Data filtering

filter()

Selects elements that satisfy a condition.

observable.pipe(
    ops.filter(lambda x: x % 2 == 0)
).subscribe(lambda x: print(f"Четное число: {x}"))

debounce()

Ignores values ​​if they arrive too quickly.

observable.pipe(
    ops.debounce(0.5)
).subscribe(lambda x: print(f"Получено: {x}"))

distinct()

Passes only unique values.

observable.pipe(
    ops.distinct()
).subscribe(lambda x: print(f"Уникальное значение: {x}"))

Combining Observables

merge()

Combines multiple Observables into one thread.

obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_([4, 5, 6])

rx.merge(obs1, obs2).subscribe(lambda x: print(f"Элемент: {x}"))

zip()

Combines the elements of multiple Observables into tuples.

obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_(['a', 'b', 'c'])

rx.zip(obs1, obs2).subscribe(lambda x: print(f"Сочетание: {x}"))

combine_latest()

Returns a combination of the last elements from each Observable.

obs1 = rx.interval(1)
obs2 = rx.interval(1.5)

rx.combine_latest(obs1, obs2).subscribe(lambda x: print(f"Комбинация: {x}"))

Testing Data Flows

There are hot and cold observables

  • Cold Observable begin to provide data from the moment of subscription.

  • Hot Observable already generate data, regardless of subscribers.

Cold Observable example:

def create_cold_observable(scheduler):
    return rx.from_([1, 2, 3], scheduler=scheduler)

scheduler = reactivex.testing.TestScheduler()
observable = create_cold_observable(scheduler)

Example of a hot Observable:

def create_hot_observable(scheduler):
    return scheduler.create_hot_observable(
        reactivex.testing.ReactiveTest.on_next(150, 1),
        reactivex.testing.ReactiveTest.on_next(210, 2),
    )

scheduler = reactivex.testing.TestScheduler()
observable = create_hot_observable(scheduler)

Using TestScheduler:

from reactivex.testing import TestScheduler, ReactiveTest

def test_map_operator():
    scheduler = TestScheduler()
    xs = scheduler.create_hot_observable(
        ReactiveTest.on_next(150, 1),
        ReactiveTest.on_next(210, 2),
        ReactiveTest.on_completed(300)
    )

    def create():
        return xs.pipe(ops.map(lambda x: x * 10))

    results = scheduler.start(create)
    assert results.messages == [
        ReactiveTest.on_next(210, 20),
        ReactiveTest.on_completed(300)
    ]

Testing with Marbles

Marble diagrams allow you to visualize data flows.

from reactivex.testing import marbles_testing

def test_filter_operator():
    with marbles_testing() as (start, cold, hot, exp):
        source = cold('--1-2-3-4-5-|')
        expected = exp('----2---4---|')

        result = start(source.pipe(
            ops.filter(lambda x: int(x) % 2 == 0)
        ))

        assert result == expected

A couple of examples of using RxPY

asyncio integration

RxPY goes well with asyncio:

import asyncio

async def main():
    loop = asyncio.get_event_loop()
    observable = rx.interval(1).pipe(
        ops.take(5)
    )

    observable.subscribe(
        on_next=lambda x: print(f"Tick: {x}"),
        on_error=lambda e: print(f"Error: {e}"),
        on_completed=lambda: print("Completed"),
        scheduler=rx.scheduler.AsyncIOScheduler(loop)
    )

    await asyncio.sleep(6)

asyncio.run(main())

RxPY can also help when working with message queues and caches in Redis:

import redis
from reactivex import Subject

r = redis.Redis()

def listen_to_channel(channel):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            yield message['data']

channel_observable = rx.from_(listen_to_channel('my_channel'))

channel_observable.subscribe(lambda msg: print(f"Received: {msg}"))

Event processing in event-driven architecture:

event_subject = Subject()

def handle_event(event):
    print(f"Handling event: {event}")

event_subject.pipe(
    ops.filter(lambda e: e['type'] == 'click'),
    ops.map(lambda e: e['payload'])
).subscribe(handle_event)

# Где-то в коде
event_subject.on_next({'type': 'click', 'payload': {'x': 100, 'y': 200}})
event_subject.on_next({'type': 'hover', 'payload': {'x': 150, 'y': 250}})

Conclusion

RxPY is a great find for those who want to manage asynchronous data flows in a new way. My recommendation: use RxPY if you work with large volumes of asynchronous data or build event-driven systems. In such projects she will reveal her full potential.

RxPY also has its own entry threshold. If you need to solve a simple problem with a minimum level of asynchrony, perhaps the default libraries asyncio or threads will be easier and faster to learn. But when it comes to complex and dynamic systems, RxPY may already be needed.

You can learn more from RxPY check out their git.


In conclusion, we remind you about the open lessons that will be held in October as part of the Microservice Architecture course:

  • October 23. Metrics and Prometheus: Let's discuss how to collect and use metrics using Prometheus in Kubernetes to monitor applications. Sign up via link

  • October 24. Message Brokers: RabbitMQ and Kafka – Learn how to use RabbitMQ and Kafka to organize asynchronous communication between microservices. Sign up via link

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *