Event-based architecture in Rust

How does EDA work?

  1. Events: The basic data or actions that occur in the system, such as clicking a button or completing a file download.

  2. Manufacturers: Components that create events and dispatch them to the system.

  3. Consumers: Components that subscribe to events and respond to them.

  4. Event Brokers: Tools or systems that manage the flow of events between producers and consumers.

As the Rust ecosystem matures, there are some good tools for working with event-based architecture, such as:

  • Tokyo: asynchronous platform for working with networks.

  • Actix: a high-performance framework for creating actor systems.

  • async-std: an asynchronous standard for working with Rust.

Installation and configuration of the environment

  1. Let's install Rust:

    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
  2. Let's install the necessary libraries:

Simple Event Based System Using Tokio

Let's create a simple application that will send and handle events using Tokio.

cargo new event_driven_example
cd event_driven_example

Let's add dependencies in Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["full"] }

Now let's write the code itself:

use tokio::sync::mpsc;
use tokio::task;

#[tokio::main]
async fn main() {
    // Создаем канал для передачи сообщений
    let (tx, mut rx) = mpsc::channel(32);

    // Запускаем асинхронную задачу для обработки событий
    task::spawn(async move {
        while let Some(event) = rx.recv().await {
            println!("Обработано событие: {}", event);
        }
    });

    // Генерация событий
    for i in 1..=10 {
        tx.send(format!("Событие {}", i)).await.unwrap();
    }
}

Here we are:

  1. Created a channel to transmit messages between producer and consumer.

  2. Launched an asynchronous task to handle events using tokio::task::spawn.

  3. Generate events and send them to the channel.

Using Actix for the Actor Model

Now let's look at using Actix to create a more complex actor-based system.

Let's add dependencies in Cargo.toml:

[dependencies]
actix = "0.12"
actix-web = "4.0.0-beta.8"
serde = { version = "1.0", features = ["derive"] }

Let's create actors:

use actix::prelude::*;
use serde::{Serialize, Deserialize};

#[derive(Message, Serialize, Deserialize)]
#[rtype(result = "()")]
struct Event {
    id: u32,
    message: String,
}

struct EventProducer;

impl Actor for EventProducer {
    type Context = Context<Self>;
}

impl Handler<Event> for EventProducer {
    type Result = ();

    fn handle(&mut self, event: Event, _: &mut Context<Self>) {
        println!("Произведено событие: {} - {}", event.id, event.message);
    }
}

struct EventConsumer;

impl Actor for EventConsumer {
    type Context = Context<Self>;
}

impl Handler<Event> for EventConsumer {
    type Result = ();

    fn handle(&mut self, event: Event, _: &mut Context<Self>) {
        println!("Получено событие: {} - {}", event.id, event.message);
    }
}

#[actix::main]
async fn main() {
    let producer = EventProducer.start();
    let consumer = EventConsumer.start();

    for i in 1..=10 {
        let event = Event {
            id: i,
            message: format!("Сообщение {}", i),
        };

        producer.do_send(event.clone());
        consumer.do_send(event);
    }
}

Explanation:

  1. Creating actors EventProducer And EventConsumerwhich handle events.

  2. We define the structure Event and implement a message for it Message.

  3. Launching actors and we pass events to them using do_send.

Implementation of an event broker

Event brokers can be good for managing the routing of events between different components of a system. You can create a simple broker using the library tokio.

Let's add dependencies to Cargo.toml:

[dependencies]
tokio = { version = "1.0", features = ["full"] }

Let's create a broker:

use tokio::sync::mpsc;
use tokio::task;

struct EventBroker {
    sender: mpsc::Sender<String>,
    receiver: mpsc::Receiver<String>,
}

impl EventBroker {
    fn new(buffer_size: usize) -> Self {
        let (sender, receiver) = mpsc::channel(buffer_size);
        EventBroker { sender, receiver }
    }

    async fn start(&mut self) {
        while let Some(event) = self.receiver.recv().await {
            println!("Брокер обработал событие: {}", event);
        }
    }

    async fn send_event(&self, event: String) {
        self.sender.send(event).await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let mut broker = EventBroker::new(32);

    task::spawn(async move {
        broker.start().await;
    });

    for i in 1..=10 {
        broker.send_event(format!("Событие {}", i)).await;
    }
}

Here we are:

  1. Created EventBroker with a channel for transmitting events.

  2. Confused broker in an asynchronous task and process incoming events.

  3. Sending events to the broker using send_event.

Optimization and scalability

Kafka and NATS

Apache Kafka – a distributed system for processing data streams in real time.

NATS – high-performance messaging system with pub/sub and request/reply support.

[dependencies]
tokio = { version = "1.0", features = ["full"] }
rdkafka = "0.29"
nats = "0.19"
// пример подключения к Kafka
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};

async fn produce_kafka_event() {
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .create()
        .unwrap();

    producer.send(
        FutureRecord::to("my-topic")
            .payload("Это сообщение для Kafka")
            .key("ключ"),
        0,
    ).await.unwrap();
}
// пример подключения к NATS
use nats::asynk::Connection;

async fn publish_nats_event() {
    let nc = Connection::connect("localhost:4222").await.unwrap();

    nc.publish("events", "Это сообщение для NATS").await.unwrap();
}

Examples

Let's say we need to process a large number of events in real time and store the results in a database.

To do this, we will use Kafka to receive and process events and Rust to process data and write results to the database:

use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use tokio_postgres::{NoTls, Client};

async fn process_events() {
    let consumer: StreamConsumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("group.id", "my-group")
        .create()
        .unwrap();

    let (client, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await.unwrap();

    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("Ошибка соединения: {}", e);
        }
    });

    consumer.subscribe(&["my-topic"]).unwrap();

    while let Some(message) = consumer.recv().await.unwrap() {
        let payload = match message.payload_view::<str>() {
            Some(Ok(text)) => text,
            Some(Err(e)) => {
                eprintln!("Ошибка декодирования сообщения: {:?}", e);
                continue;
            }
            None => continue,
        };

        println!("Получено сообщение: {}", payload);

        client.execute("INSERT INTO events (data) VALUES ($1)", &[&payload]).await.unwrap();
    }
}

More information about the libraries used can be found at the hyperlinks:

  1. Tokyo — an asynchronous platform for working with networks.

  2. Actix — a high-performance framework for creating actor systems.

  3. async std — an asynchronous standard for working with Rust.


You can gain more practical skills in application architecture within the framework of practical online courses from industry experts.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *