Lock-free data structures in Rust

lock-free data structures allow multiple threads to simultaneously read and modify data without blocking.

Basics of lock-free programming in Rust

Rust offers thread safety thanks to its ownership model and type system:

  1. Ownership model

    • Every resource in Rust has an owner, and only one thread can be the owner of a resource at a given time. This prevents concurrent access to the same resource without synchronization.

    • Life cycles controls the lifetime of links, preventing the use of invalid links.

  2. Types and compile-time checking

    • Type Send: This trait indicates that the data type can be safely passed between threads.

    • Type Sync: Indicates that the data type can be safely accessed from multiple threads simultaneously.

Rust checks that traits are used correctly Send And Sync at the compilation stage.

use std::sync::Arc;
use std::thread;

fn main() {
    let data = Arc::new(vec![1, 2, 3]);
    let mut handles = vec![];

    for _ in 0..3 {
        let data = Arc::clone(&data);
        let handle = thread::spawn(move || {
            println!("{:?}", data);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Here we use Arc to safely share data between threads. Type Send allows you to move Arc between threads, and Sync gives secure access to data.

Atomic data types Rust allows you to perform operations on data without locking.

The main atomic data types in the Rust standard library include:

  1. AtomicBool

    • Atomic Boolean variable.

    • Example:

      use std::sync::atomic::{AtomicBool, Ordering};
      use std::thread;
      
      let flag = AtomicBool::new(false);
      
      let handle = thread::spawn(move || {
          flag.store(true, Ordering::Relaxed);
      });
      
      handle.join().unwrap();
      assert_eq!(flag.load(Ordering::Relaxed), true);
  2. AtomicIsize and AtomicUsize

    • Atomic integers (signed and unsigned).

    • Example:

      use std::sync::atomic::{AtomicUsize, Ordering};
      use std::thread;
      
      let counter = AtomicUsize::new(0);
      let handles: Vec<_> = (0..10).map(|_| {
          let counter = &counter;
          thread::spawn(move || {
              for _ in 0..1000 {
                  counter.fetch_add(1, Ordering::SeqCst);
              }
          })
      }).collect();
      
      for handle in handles {
          handle.join().unwrap();
      }
      
      assert_eq!(counter.load(Ordering::SeqCst), 10000);
  3. AtomicPtr

    • Atomic pointer.

    • Example:

      use std::sync::atomic::{AtomicPtr, Ordering};
      use std::ptr;
      
      let mut data = 5;
      let atomic_ptr = AtomicPtr::new(&mut data);
      
      let new_data = 10;
      atomic_ptr.store(&mut new_data, Ordering::SeqCst);
      
      assert_eq!(unsafe { *atomic_ptr.load(Ordering::SeqCst) }, 10);

Atomic operations ensure data security and integrity without the need for locks!

Crossbeam for lock-free data structures

The crossbeam library is designed to simplify multi-threaded programming and high-performance secure lock-free data structures.

The main elements of Crossbeam:

  • Data structures: Multi-threaded data structures such as queues and decks.

  • Epoch-based garbage collection: A memory management mechanism that minimizes blocking during garbage collection.

  • Concurrency primitives: Synchronization primitives to make writing concurrent code easier.

ArrayQueue

ArrayQueue is a lock-free queue implemented on an array with a fixed size. This queue is ideal for scenarios where the max. the number of elements to be stored. Example:

use crossbeam_queue::ArrayQueue;
use std::sync::Arc;
use std::thread;

fn main() {
    let queue = Arc::new(ArrayQueue::new(100));
    let mut handles = vec![];

    for i in 0..5 {
        let queue = Arc::clone(&queue);
        handles.push(thread::spawn(move || {
            for j in 0..20 {
                queue.push(i * 20 + j).unwrap();
            }
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    while let Ok(val) = queue.pop() {
        println!("{}", val);
    }
}

Multiple threads add elements to the ArrayQueue and then the elements are removed from the queue.

SegQueue

SegQueue is a lock-free queue with dynamic expansion that is suitable for scenarios with an indefinite number of elements. Example:

use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::thread;

fn main() {
    let queue = Arc::new(SegQueue::new());
    let mut handles = vec![];

    for i in 0..5 {
        let queue = Arc::clone(&queue);
        handles.push(thread::spawn(move || {
            for j in 0..20 {
                queue.push(i * 20 + j);
            }
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    while let Some(val) = queue.pop() {
        println!("{}", val);
    }
}

SegQueue is used similarly to ArrayQueue, but is not limited to a fixed size.

epoch-based garbage collection

Epoch-based garbage collection is used to manage memory without locking. The main feature is to divide the time into epochs and keep track of when each thread is in an active state.

What does epoch-based GC consist of:

  • Global epoch counter: A global epoch counter that keeps track of the current epoch.

  • Thread-local epoch counter: A thread-local epoch counter that is updated each time it enters a critical section.

  • Garbage lists: trash lists for each epoch, into which freed objects are added.

Example:

use crossbeam_epoch as epoch;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;

struct Node {
    value: i32,
    next: AtomicPtr<Node>,
}

fn main() {
    let n1 = Box::into_raw(Box::new(Node {
        value: 1,
        next: AtomicPtr::new(ptr::null_mut()),
    }));

    let n2 = Box::into_raw(Box::new(Node {
        value: 2,
        next: AtomicPtr::new(n1),
    }));

    let head = AtomicPtr::new(n2);

    epoch::pin(|scope| {
        let h = head.load(Ordering::Relaxed, scope);
        unsafe {
            if !h.is_null() {
                println!("Value: {}", (*h).value);
            }
        }
    });

    unsafe {
        drop(Box::from_raw(n1));
        drop(Box::from_raw(n2));
    }
}

We created a simple lock-free data structure and used epoch-based GC for memory management.

Examples of using

Multi-threaded event processing in a server system

Server applications need to process many events coming from different clients. With lock-free queues, you can avoid delays associated with locks:

use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

struct Event {
    id: u32,
    payload: String,
}

fn main() {
    let event_queue = Arc::new(SegQueue::new());
    let mut handles = vec![];

    // поток, добавляющий события в очередь
    let producer_queue = Arc::clone(&event_queue);
    handles.push(thread::spawn(move || {
        for i in 0..100 {
            let event = Event {
                id: i,
                payload: format!("Event {}", i),
            };
            producer_queue.push(event);
            thread::sleep(Duration::from_millis(10));
        }
    }));

    // поток, обрабатывающий события из очереди
    let consumer_queue = Arc::clone(&event_queue);
    handles.push(thread::spawn(move || {
        while let Some(event) = consumer_queue.pop() {
            println!("Processing event id: {}", event.id);
            thread::sleep(Duration::from_millis(20));
        }
    }));

    for handle in handles {
        handle.join().unwrap();
    }
}

One thread adds events to the SegQueue lock-free queue, and another thread retrieves and processes them.

Thread pool for web server

Thread pools are primarily used in web servers to process client requests. lock-free also finds its application here:

use crossbeam_queue::SegQueue;
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::{self, Sender};

struct ThreadPool {
    workers: Vec<Worker>,
    sender: Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
            println!("Worker {} got a job; executing.", id);
            job();
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

fn main() {
    let pool = ThreadPool::new(4);

    for i in 0..8 {
        pool.execute(move || {
            println!("Processing request {}", i);
        });
    }
}

The thread pool uses the mpsc channel to transfer tasks.

Concurrent Hash Map

With Concurrent Hash Map you can coolly cache data shared between threads without the need for locks:

use std::sync::Arc;
use dashmap::DashMap;
use std::thread;

fn main() {
    let cache = Arc::new(DashMap::new());
    let mut handles = vec![];

    // потоки, добавляющие данные в кэш
    for i in 0..5 {
        let cache = Arc::clone(&cache);
        handles.push(thread::spawn(move || {
            for j in 0..10 {
                cache.insert(i * 10 + j, format!("Value {}", j));
            }
        }));
    }

    // потоки, читающие данные из кэша
    for i in 0..5 {
        let cache = Arc::clone(&cache);
        handles.push(thread::spawn(move || {
            for j in 0..10 {
                if let Some(value) = cache.get(&(i * 10 + j)) {
                    println!("Read from cache: {}", value);
                }
            }
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

We use DashMap, which provides lock-free operations for inserting and reading data.


In my experience, it was noticed that with lock-free data structures you can reduce task execution time by 30-50% compared to using mutexes.

OTUS experts talk more about programming languages ​​in practical online courses. With a complete catalog of courses can be found via the link.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *