Data Frameworks in Rust. Part 1

Polars

Unlike pandas, where the CPU and memory load can be noticeable, Polars uses Rust features for parallel computing and optimized resource usage.

The main advantages of Polars:

  • Fast processing of big data: Polars is written in Rust and uses multithreading.

  • Multidimensional data support: Polars supports working with multidimensional arrays.

  • Lazy and Physical Computing Model: Polars supports lazy evaluation

  • Working with different data formats: Support CSV, JSON, Parquet, IPC, Avro and other data formats.

Examples of use

Reading and processing a CSV file

Let's load the data from the CSV file, filter the rows and perform group aggregation:

use polars::prelude::*;
use std::fs::File;

fn main() -> Result<()> {
    // чтение CSV-файла
    let file = File::open("data.csv")?;
    let df = CsvReader::new(file)
        .infer_schema(None)
        .has_header(true)
        .finish()?;

    // фильтрация данных по условию
    let filtered_df = df
        .lazy()
        .filter(col("column_name").gt(lit(10)))
        .collect()?;

    // группировка и агрегация
    let result_df = filtered_df
        .lazy()
        .groupby([col("group_column")])
        .agg([col("value_column").mean()])
        .collect()?;

    println!("{:?}", result_df);

    Ok(())
}

Creating a DataFrame and Performing Operations

Let's create a DF and perform addition and filtering operations.

use polars::prelude::*;

fn main() -> Result<()> {
    // создание DataFrame
    let df = df![
        "column1" => &[1, 2, 3, 4, 5],
        "column2" => &[10, 20, 30, 40, 50]
    ]?;

    // добавление нового столбца с результатом сложения
    let df = df.lazy()
        .with_column((col("column1") + col("column2")).alias("sum"))
        .collect()?;

    // фильтрация строк, где сумма больше 30
    let filtered_df = df.lazy()
        .filter(col("sum").gt(lit(30)))
        .collect()?;

    println!("{:?}", filtered_df);

    Ok(())
}

Lazy Evaluation and Working with JSON

use polars::prelude::*;

fn main() -> Result<()> {
    let json_data = r#"
        [
            {"name": "Alice", "age": 25},
            {"name": "Bob", "age": 30},
            {"name": "Charlie", "age": 35}
        ]
    "#;

    // чтение JSON данных
    let df = JsonReader::new(json_data.as_bytes()).finish()?;

    // ленивые вычисления: фильтрация и вычисление среднего возраста
    let result = df.lazy()
        .filter(col("age").gt(lit(25)))
        .select([col("age").mean()])
        .collect()?;

    println!("{:?}", result);

    Ok(())
}

More details can be found with Polars read here.

Arroyo

Arroyo is a distributed stream processing engine focused on stateful computing with support for both bounded and unbounded data streams.

Arroyo is built using the Dataflow model, which allows managing data flow states, enabling complex computations such as window aggregations, join operations, and more. All of this functionality is also implemented using Rust.

Arroyo Usage Examples

Let's implement a basic setup for processing the event stream and counting the number of events in a time window:

use arroyo::pipeline::Pipeline;
use arroyo::window::TumblingWindow;

fn main() {
    // инициализируем конвейер обработки данных
    let mut pipeline = Pipeline::new();

    // источник данных
    let source = pipeline.add_source("source_name");

    // применяем оконную функцию с временем окна в 5 минут
    let windowed = source.window(TumblingWindow::minutes(5))
                         .count();

    // выводим результат в консоль
    pipeline.add_sink(windowed, |result| println!("Количество событий: {:?}", result));

    // запуск конвейера
    pipeline.run();
}

Now let's implement stateful processing, where we need to track the state between events, for example, calculating an average value based on previous data:

use arroyo::state::StatefulOperator;
use arroyo::pipeline::Pipeline;

struct AverageState {
    sum: f64,
    count: u64,
}

impl StatefulOperator for AverageState {
    type Input = f64;
    type Output = f64;

    fn process(&mut self, value: Self::Input) -> Option<Self::Output> {
        self.sum += value;
        self.count += 1;
        Some(self.sum / self.count as f64)
    }
}

fn main() {
    let mut pipeline = Pipeline::new();

    // инициализируем источник данных
    let source = pipeline.add_source("numeric_data");

    // применяем stateful операцию для вычисления среднего значения
    let averaged = source.stateful_operator(AverageState { sum: 0.0, count: 0 });

    // отправляем результат в консоль
    pipeline.add_sink(averaged, |avg| println!("Среднее значение: {:?}", avg));

    // запуск конвейера
    pipeline.run();
}

Now let's look at how Arroyo can be used for more complex tasks, such as combining multiple data streams and performing windowed aggregations:

use arroyo::pipeline::Pipeline;
use arroyo::window::SlidingWindow;

fn main() {
    let mut pipeline = Pipeline::new();

    // инициализация двух источников данных
    let source1 = pipeline.add_source("source1");
    let source2 = pipeline.add_source("source2");

    // оконные операции на двух потоках данных
    let windowed1 = source1.window(SlidingWindow::minutes(10)).sum();
    let windowed2 = source2.window(SlidingWindow::minutes(10)).sum();

    // join двух потоков данных по ключу
    let joined = windowed1.join(windowed2, |key1, key2| key1 == key2);

    // обработка результата
    pipeline.add_sink(joined, |result| println!("Join result: {:?}", result));

    // запуск конвейера
    pipeline.run();
}

You can find out more from Arryo read here.

Timber

Timber is a simple logger designed for applications running in multithreaded environments. Its main purpose is to simplify the logging process in parallel tasks. Timber allows you to log both to standard output and to a specified file.

Main features of Timber:

  1. Timber supports logging levels that can be configured via macros.

  2. By default, Timber outputs logs to stdout, but can be easily reconfigured to write to a file.

Example of use Timber in an application that can switch between debug and release modes by changing logging levels:

#[macro_use(timber)]
use timber;

#[cfg(debug)]
pub mod level {
    pub const ERR: i32 = 1;
    pub const DEB: i32 = 2;
    pub const INF: i32 = 7;
}

#[cfg(not(debug))]
pub mod level {
    pub const ERR: i32 = 1;
    pub const DEB: i32 = 0;
    pub const INF: i32 = 3;
}

// макросы для упрощения логирования
macro_rules! log_err{($($arg:tt)*) => {timber!($crate::level::ERR, "ERR", $($arg)*)}}
macro_rules! log_deb{($($arg:tt)*) => {timber!($crate::level::DEB, "DEB", $($arg)*)}}
macro_rules! log_inf{($($arg:tt)*) => {timber!($crate::level::INF, "INF", $($arg)*)}}

fn main() {
    timber::init("log.txt").unwrap(); // инициализация логирования в файл

    log_err!("Ошибка! Этот лог будет виден всегда.");
    log_deb!("Отладка. Этот лог виден только в режиме отладки.");
    log_inf!("Информация. Этот лог будет виден и в релизе, и в отладке.");
}

It is possible to define log level constants ERR, DEB, INFand the compiler will ignore unnecessary lines in the release build.

Colleagues from OTUS consider more practical tools within the framework of practical online courses from market experts. Learn more about the courses you can in the catalog.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *