Event-based architecture in Rust
How does EDA work?
Events: The basic data or actions that occur in the system, such as clicking a button or completing a file download.
Manufacturers: Components that create events and dispatch them to the system.
Consumers: Components that subscribe to events and respond to them.
Event Brokers: Tools or systems that manage the flow of events between producers and consumers.
As the Rust ecosystem matures, there are some good tools for working with event-based architecture, such as:
Tokyo: asynchronous platform for working with networks.
Actix: a high-performance framework for creating actor systems.
async-std: an asynchronous standard for working with Rust.
Installation and configuration of the environment
Let's install Rust:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Let's install the necessary libraries:
Simple Event Based System Using Tokio
Let's create a simple application that will send and handle events using Tokio.
cargo new event_driven_example
cd event_driven_example
Let's add dependencies in Cargo.toml
:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
Now let's write the code itself:
use tokio::sync::mpsc;
use tokio::task;
#[tokio::main]
async fn main() {
// Создаем канал для передачи сообщений
let (tx, mut rx) = mpsc::channel(32);
// Запускаем асинхронную задачу для обработки событий
task::spawn(async move {
while let Some(event) = rx.recv().await {
println!("Обработано событие: {}", event);
}
});
// Генерация событий
for i in 1..=10 {
tx.send(format!("Событие {}", i)).await.unwrap();
}
}
Here we are:
Created a channel to transmit messages between producer and consumer.
Launched an asynchronous task to handle events using
tokio::task::spawn
.Generate events and send them to the channel.
Using Actix for the Actor Model
Now let's look at using Actix to create a more complex actor-based system.
Let's add dependencies in Cargo.toml
:
[dependencies]
actix = "0.12"
actix-web = "4.0.0-beta.8"
serde = { version = "1.0", features = ["derive"] }
Let's create actors:
use actix::prelude::*;
use serde::{Serialize, Deserialize};
#[derive(Message, Serialize, Deserialize)]
#[rtype(result = "()")]
struct Event {
id: u32,
message: String,
}
struct EventProducer;
impl Actor for EventProducer {
type Context = Context<Self>;
}
impl Handler<Event> for EventProducer {
type Result = ();
fn handle(&mut self, event: Event, _: &mut Context<Self>) {
println!("Произведено событие: {} - {}", event.id, event.message);
}
}
struct EventConsumer;
impl Actor for EventConsumer {
type Context = Context<Self>;
}
impl Handler<Event> for EventConsumer {
type Result = ();
fn handle(&mut self, event: Event, _: &mut Context<Self>) {
println!("Получено событие: {} - {}", event.id, event.message);
}
}
#[actix::main]
async fn main() {
let producer = EventProducer.start();
let consumer = EventConsumer.start();
for i in 1..=10 {
let event = Event {
id: i,
message: format!("Сообщение {}", i),
};
producer.do_send(event.clone());
consumer.do_send(event);
}
}
Explanation:
Creating actors
EventProducer
AndEventConsumer
which handle events.We define the structure
Event
and implement a message for itMessage
.Launching actors and we pass events to them using
do_send
.
Implementation of an event broker
Event brokers can be good for managing the routing of events between different components of a system. You can create a simple broker using the library tokio
.
Let's add dependencies to Cargo.toml
:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
Let's create a broker:
use tokio::sync::mpsc;
use tokio::task;
struct EventBroker {
sender: mpsc::Sender<String>,
receiver: mpsc::Receiver<String>,
}
impl EventBroker {
fn new(buffer_size: usize) -> Self {
let (sender, receiver) = mpsc::channel(buffer_size);
EventBroker { sender, receiver }
}
async fn start(&mut self) {
while let Some(event) = self.receiver.recv().await {
println!("Брокер обработал событие: {}", event);
}
}
async fn send_event(&self, event: String) {
self.sender.send(event).await.unwrap();
}
}
#[tokio::main]
async fn main() {
let mut broker = EventBroker::new(32);
task::spawn(async move {
broker.start().await;
});
for i in 1..=10 {
broker.send_event(format!("Событие {}", i)).await;
}
}
Here we are:
Created
EventBroker
with a channel for transmitting events.Confused broker in an asynchronous task and process incoming events.
Sending events to the broker using
send_event
.
Optimization and scalability
Kafka and NATS
Apache Kafka – a distributed system for processing data streams in real time.
NATS – high-performance messaging system with pub/sub and request/reply support.
[dependencies]
tokio = { version = "1.0", features = ["full"] }
rdkafka = "0.29"
nats = "0.19"
// пример подключения к Kafka
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
async fn produce_kafka_event() {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.unwrap();
producer.send(
FutureRecord::to("my-topic")
.payload("Это сообщение для Kafka")
.key("ключ"),
0,
).await.unwrap();
}
// пример подключения к NATS
use nats::asynk::Connection;
async fn publish_nats_event() {
let nc = Connection::connect("localhost:4222").await.unwrap();
nc.publish("events", "Это сообщение для NATS").await.unwrap();
}
Examples
Let's say we need to process a large number of events in real time and store the results in a database.
To do this, we will use Kafka to receive and process events and Rust to process data and write results to the database:
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use tokio_postgres::{NoTls, Client};
async fn process_events() {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "my-group")
.create()
.unwrap();
let (client, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Ошибка соединения: {}", e);
}
});
consumer.subscribe(&["my-topic"]).unwrap();
while let Some(message) = consumer.recv().await.unwrap() {
let payload = match message.payload_view::<str>() {
Some(Ok(text)) => text,
Some(Err(e)) => {
eprintln!("Ошибка декодирования сообщения: {:?}", e);
continue;
}
None => continue,
};
println!("Получено сообщение: {}", payload);
client.execute("INSERT INTO events (data) VALUES ($1)", &[&payload]).await.unwrap();
}
}
More information about the libraries used can be found at the hyperlinks:
Tokyo — an asynchronous platform for working with networks.
Actix — a high-performance framework for creating actor systems.
async std — an asynchronous standard for working with Rust.
You can gain more practical skills in application architecture within the framework of practical online courses from industry experts.