RxPY library overview
RxPY Basics
RxPY is a library that implements the principles of functional reactive programming in Python. It allows you to create and manage asynchronous data streams, combining them, filtering and transforming them using chains of operators. Main components of RxPY:
Observable: A data source that can emit events.
Observer: a subscriber that reacts to events from the Observable.
Operators: Functions that allow you to transform, filter, and combine an Observable.
Installing RxPY couldn't be easier:
pip install reactivex
In version 4.0.4 RxPY has undergone significant changes:
Renaming a module: now let's import
reactivex
notrx
.Improved typing: Added type annotations for better IDE support.
Updated work with operators: method usage
pipe
for a chain of operators.
If you worked with RxPY v3, here's how to integrate the whole thing:
Import changes:
import rx
->import reactivex as rx
.Operators: instead of
observable.map()
now we useobservable.pipe(ops.map())
.Removed obsolete functions: Some old operators and methods have been removed or renamed.
Creating an Observable and Working with Operators
Creating an Observable
just()
Creates an Observable that produces a single value.
import reactivex as rx
observable = rx.just(42)
observable.subscribe(lambda x: print(f"Значение: {x}"))
from_()
Converts an iterable object to an Observable.
observable = rx.from_([1, 2, 3, 4, 5])
observable.subscribe(lambda x: print(f"Элемент: {x}"))
interval()
Returns a sequence of numbers at a specified time interval.
import time
from reactivex import interval
observable = interval(1) # каждую секунду
subscription = observable.subscribe(lambda x: print(f"Тик: {x}"))
time.sleep(5)
subscription.dispose()
timer()
Returns a value after a specified delay.
from reactivex import timer
observable = timer(3) # через 3 секунды
observable.subscribe(lambda x: print("Таймер сработал!"))
Observable Transformation
map()
Applies a function to each element.
from reactivex import operators as ops
observable.pipe(
ops.map(lambda x: x * x)
).subscribe(lambda x: print(f"Квадрат: {x}"))
flat_map()
Expands nested Observables.
def duplicate(x):
return rx.from_([x, x*2, x*3])
observable.pipe(
ops.flat_map(duplicate)
).subscribe(lambda x: print(f"Значение: {x}"))
scan()
Analogue reduce
but produces the accumulated result at each iteration.
observable.pipe(
ops.scan(lambda acc, x: acc + x, seed=0)
).subscribe(lambda x: print(f"Сумма: {x}"))
Data filtering
filter()
Selects elements that satisfy a condition.
observable.pipe(
ops.filter(lambda x: x % 2 == 0)
).subscribe(lambda x: print(f"Четное число: {x}"))
debounce()
Ignores values if they arrive too quickly.
observable.pipe(
ops.debounce(0.5)
).subscribe(lambda x: print(f"Получено: {x}"))
distinct()
Passes only unique values.
observable.pipe(
ops.distinct()
).subscribe(lambda x: print(f"Уникальное значение: {x}"))
Combining Observables
merge()
Combines multiple Observables into one thread.
obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_([4, 5, 6])
rx.merge(obs1, obs2).subscribe(lambda x: print(f"Элемент: {x}"))
zip()
Combines the elements of multiple Observables into tuples.
obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_(['a', 'b', 'c'])
rx.zip(obs1, obs2).subscribe(lambda x: print(f"Сочетание: {x}"))
combine_latest()
Returns a combination of the last elements from each Observable.
obs1 = rx.interval(1)
obs2 = rx.interval(1.5)
rx.combine_latest(obs1, obs2).subscribe(lambda x: print(f"Комбинация: {x}"))
Testing Data Flows
There are hot and cold observables
Cold Observable begin to provide data from the moment of subscription.
Hot Observable already generate data, regardless of subscribers.
Cold Observable example:
def create_cold_observable(scheduler):
return rx.from_([1, 2, 3], scheduler=scheduler)
scheduler = reactivex.testing.TestScheduler()
observable = create_cold_observable(scheduler)
Example of a hot Observable:
def create_hot_observable(scheduler):
return scheduler.create_hot_observable(
reactivex.testing.ReactiveTest.on_next(150, 1),
reactivex.testing.ReactiveTest.on_next(210, 2),
)
scheduler = reactivex.testing.TestScheduler()
observable = create_hot_observable(scheduler)
Using TestScheduler:
from reactivex.testing import TestScheduler, ReactiveTest
def test_map_operator():
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
ReactiveTest.on_next(150, 1),
ReactiveTest.on_next(210, 2),
ReactiveTest.on_completed(300)
)
def create():
return xs.pipe(ops.map(lambda x: x * 10))
results = scheduler.start(create)
assert results.messages == [
ReactiveTest.on_next(210, 20),
ReactiveTest.on_completed(300)
]
Testing with Marbles
Marble diagrams allow you to visualize data flows.
from reactivex.testing import marbles_testing
def test_filter_operator():
with marbles_testing() as (start, cold, hot, exp):
source = cold('--1-2-3-4-5-|')
expected = exp('----2---4---|')
result = start(source.pipe(
ops.filter(lambda x: int(x) % 2 == 0)
))
assert result == expected
A couple of examples of using RxPY
asyncio integration
RxPY goes well with asyncio
:
import asyncio
async def main():
loop = asyncio.get_event_loop()
observable = rx.interval(1).pipe(
ops.take(5)
)
observable.subscribe(
on_next=lambda x: print(f"Tick: {x}"),
on_error=lambda e: print(f"Error: {e}"),
on_completed=lambda: print("Completed"),
scheduler=rx.scheduler.AsyncIOScheduler(loop)
)
await asyncio.sleep(6)
asyncio.run(main())
RxPY can also help when working with message queues and caches in Redis:
import redis
from reactivex import Subject
r = redis.Redis()
def listen_to_channel(channel):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
yield message['data']
channel_observable = rx.from_(listen_to_channel('my_channel'))
channel_observable.subscribe(lambda msg: print(f"Received: {msg}"))
Event processing in event-driven architecture:
event_subject = Subject()
def handle_event(event):
print(f"Handling event: {event}")
event_subject.pipe(
ops.filter(lambda e: e['type'] == 'click'),
ops.map(lambda e: e['payload'])
).subscribe(handle_event)
# Где-то в коде
event_subject.on_next({'type': 'click', 'payload': {'x': 100, 'y': 200}})
event_subject.on_next({'type': 'hover', 'payload': {'x': 150, 'y': 250}})
Conclusion
RxPY is a great find for those who want to manage asynchronous data flows in a new way. My recommendation: use RxPY if you work with large volumes of asynchronous data or build event-driven systems. In such projects she will reveal her full potential.
RxPY also has its own entry threshold. If you need to solve a simple problem with a minimum level of asynchrony, perhaps the default libraries asyncio
or threads will be easier and faster to learn. But when it comes to complex and dynamic systems, RxPY may already be needed.
You can learn more from RxPY check out their git.
In conclusion, we remind you about the open lessons that will be held in October as part of the Microservice Architecture course:
October 23. Metrics and Prometheus: Let's discuss how to collect and use metrics using Prometheus in Kubernetes to monitor applications. Sign up via link
October 24. Message Brokers: RabbitMQ and Kafka – Learn how to use RabbitMQ and Kafka to organize asynchronous communication between microservices. Sign up via link