System Threads vs. Async Concurrency

Recently, the “async/await” concept in Rust has become quite popular. Of course, for the vast majority of tasks, this is the best option. Since it is oriented towards the fact that the system that processes network requests, for example, encounters a blocking I/O to the database. As a result, the best solution would be to use “async/await”, since it allows using one thread to process multiple requests. If the “async” function cannot be completed, for example, due to waiting for I/O, it can give control at the point of its call to “await”. And the “executor”, for example “Tokio”, can switch to another task.

However, there are cases when “async/await” is not suitable. For example, when you need to process a request with a minimum waiting time. For this reason, you need to use system threads. To control your task within a single thread, and not rely on the “executor” from Tokio to decide for you what task to perform next.

In order to demonstrate the difference between “async/await” and system threads, I have chosen a task from the Fintech area for order matching. It is implied that order matching on the exchange will be performed within one system thread, which will be attached to the processor core. Orders will be received into the matching system asynchronously from other threads.

The first problem I encountered was how to send orders from one thread to the matching system running in another thread. Tokio has “mpsc” channels, but they are not suitable for my task, since they are asynchronous. At first I turned to the standard library “std::sync::mpsc”, but it blocks the thread, both when sending and when receiving messages. As a result, I decided to use the “crossbeam” library, which allows sending and receiving messages without blocking the thread. Here is an example of code:

use crossbeam_queue::ArrayQueue;

let order_queue:Arc<ArrayQueue<Order>> = Arc::new(ArrayQueue::new(100));

// send from one thread
order_queue.push(order);

// receive from another thread
while let Some(order) = order_queue.pop() {
    // process order
}

The second problem is running my matching system in a separate thread without the help of “executors” like Tokio. For this I used the standard library “std::thread”. Here is an example of code:

let match_system_thread_handle = std::thread::spawn(move || {
    let mut matcher_system = OrderMatcher::new(crypto_currency_id, currency_id);
    loop {
        while let Some(order) = order_queue.pop() {
            matcher_system.add_order(order);
        };
        let order_matches = matcher_system.match_orders();
        for order_match in order_matches {
            let _ = order_match_queue.push(order_match);
        }
        std::thread::sleep(std::time::Duration::from_secs(1));
   }   
});

As you can see from the example above, I created a thread that will run forever. Inside the thread, I get orders from the “order_queue” queue, perform the matching, and send the result of the matching to another “order_match_queue” queue.

The third problem is to attach the thread to the processor core. For this I used the “core_affinity” library. Here is an example of the code:

let core_ids = core_affinity::get_core_ids().unwrap();
let core_id = core_ids[0];

let matcher_system = MatcherSystem::start(crypto_currency_id, currency_id, core_id);

impl MatcherSystem {
    pub fn start(crypto_currency_id: u64, currency_id: u64, core_id: CoreId) -> MatcherSystem {
        let _match_system_thread_handle = std::thread::spawn(move || {
            let ok = core_affinity::set_for_current(core_id);
            if ok {
                let mut matcher_system = OrderMatcher::new(crypto_currency_id, currency_id);
                loop {
                    // process orders
                }
            } else {
                panic!("Failed to set core affinity");
            }
        });
    }
}

As a result, I got an Open Source library for matching orders on the exchange, which works within one system thread. This allows you to control the matching task within one thread, rather than relying on the “executor” from Tokio. To clearly demonstrate this example, I implemented a minimal version of the library for rapid prototyping of a cryptocurrency exchange. Here is an example of code for using this library:

fn main() {
    let storage_system = Arc::new(StorageSystem::new());
    let mut assets_system =  AssetSystem::new(storage_system.clone());
    if assets_system.get_currencies().len() == 0 {
        let _ = assets_system.create_currency(Currency { id: 0, symbol: "USD".to_string() });
    }
    if assets_system.get_crypto_currencies().len() == 0 {
        let _ = assets_system.create_crypto_currency(CryptoCurrency { id: 0, symbol: "BTC".to_string() });
    }

    let assets_system = Arc::new(assets_system);
    let mut accounts_system = AccountSystem::new(storage_system.clone(), assets_system.clone());

    let currency_id = assets_system.get_currencies()[0].id;
    let crypto_currency_id = assets_system.get_crypto_currencies()[0].id;

    if storage_system.load_accounts().len() == 0 {
        let account1_id = accounts_system.create_account(Account { id: 0, name: "Alice".to_string(), timestamp: SystemTime::now() });
        let account2_id = accounts_system.create_account(Account { id: 0, name: "Bob".to_string(), timestamp: SystemTime::now() });
        accounts_system.add_currency_to_account(account1_id, currency_id, 100000.0);
        accounts_system.add_crypto_currency_to_account(account2_id, crypto_currency_id, 1.0);
    }
    let accounts = storage_system.load_accounts();
    let account1_id = accounts[0].id;
    let account2_id = accounts[1].id;

    let mut order_system = OrderSystem::new(storage_system.clone(), assets_system.clone());

    let core_ids = core_affinity::get_core_ids().unwrap();
    let core_id = core_ids[0];

    let matcher_system = MatcherSystem::start(crypto_currency_id, currency_id, core_id);
    let order1 = order_system.create_order(Order { id: 0, account_id: account1_id, trade_type: TradeType::Buy, price_type: PriceType::Market, execution_type: ExecutionType::Full, crypto_currency_id: crypto_currency_id, currency_id, quantity: 0.5,  status: OrderStatus::Open, timestamp: SystemTime::now()});
    let order2 = order_system.create_order(Order { id: 0, account_id: account2_id, trade_type: TradeType::Sell, price_type: PriceType::Limit(50000.00), execution_type: ExecutionType::Partial, crypto_currency_id: crypto_currency_id, currency_id, quantity: 1.0,  status: OrderStatus::Open, timestamp: SystemTime::now()});
    let _ = matcher_system.add_order(order1);
    let _ = matcher_system.add_order(order2);
    print_accounts(storage_system.clone());
    loop {
        while let Some(order_match) = matcher_system.get_order_match() {
           tracing::info!("OrderMatch: Buy Order Id: {} Sell Order Id: {} Quantity: {} Price: {}", order_match.buy_order_id, order_match.sell_order_id, order_match.quantity, order_match.price);
           order_system.create_order_history(&order_match, &mut accounts_system);
           print_accounts(storage_system.clone());
           if storage_system.get_account_currency(account1_id, currency_id).unwrap().balance > 0.0 {
               let order = order_system.create_order(Order { id: 0, account_id: account1_id, trade_type: TradeType::Buy, price_type: PriceType::Market, execution_type: ExecutionType::Full, crypto_currency_id: crypto_currency_id, currency_id, quantity: 0.5,  status: OrderStatus::Open, timestamp: SystemTime::now()});
               matcher_system.add_order(order);
           }
        }
        std::thread::sleep(std::time::Duration::from_secs(1));
    }

}

fn print_accounts(storage_system: Arc<StorageSystem>) {
    for account in storage_system.load_accounts() {

        let datetime: DateTime<Local> = account.timestamp.into();
        tracing::info! {
            "AccountId: {} Name: {} Timestamp: {}",account.id,  account.name, datetime.format("%Y-%m-%d %H:%M:%S").to_string()
        };

        for account_currency in storage_system.get_account_currency_by_account_id(account.id) {
            tracing::info! {
                "CurrencyId: {} Symbol: {} Balance: {:.2}", account_currency.id, storage_system.get_currency(account_currency.currency_id).unwrap().symbol, account_currency.balance
            };

            for account_currency_history in storage_system.get_currency_history_by_account_id_account_currency_id(account.id, account_currency.id) {
                let datetime: DateTime<Local> = account_currency_history.timestamp.into();
                tracing::info! {
                    "CurrencyHistoryId: {} Balance: {:.2} Timestamp: {}", account_currency_history.id,  account_currency_history.balance, datetime.format("%Y-%m-%d %H:%M:%S").to_string()
                };
            }
        }
        for account_crypto_currency in storage_system.get_account_crypto_currencies_by_account_id(account.id) {
            tracing::info! {
                "CryptoCurrencyId: {} {} Amount: {}", account_crypto_currency.id, storage_system.get_crypto_currency(account_crypto_currency.crypto_currency_id).unwrap().symbol, account_crypto_currency.quantity
            };

            for account_crypto_currency_history in storage_system.get_crypto_currency_history_by_account_id_crypto_currency_id(account.id, account_crypto_currency.crypto_currency_id) {
                let datetime: DateTime<Local> = account_crypto_currency_history.timestamp.into();
                tracing::info! {
                    "CryptoCurrencyHistoryId: {} Quantity: {} Timestamp: {}", account_crypto_currency_history.id,  account_crypto_currency_history.quantity, datetime.format("%Y-%m-%d %H:%M:%S").to_string()
                };
            }
        }
    }

}

Example of work:

2024-07-08T14:55:28.291721Z  INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 1 Name: Alice Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:28.291953Z  INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 1 Symbol: USD Balance: 100000.00
2024-07-08T14:55:28.292068Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 1 Balance: 100000.00 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:28.292168Z  INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 2 Name: Bob Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:28.292261Z  INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 2 Symbol: USD Balance: 0.00
2024-07-08T14:55:28.292380Z  INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 1 BTC Amount: 1
2024-07-08T14:55:28.292476Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 1 Quantity: 1 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.288109Z  INFO ThreadId(02) src\matcher.rs:157: Before Matching
2024-07-08T14:55:29.288289Z  INFO ThreadId(02) src\matcher.rs:160: Buy Order: Order { id: 1, account_id: 1, trade_type: Buy, price_type: Market, execution_type: Full, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241282877806 }, status: Open }
2024-07-08T14:55:29.288362Z  INFO ThreadId(02) src\matcher.rs:163: Sell Order: Order { id: 2, account_id: 2, trade_type: Sell, price_type: Limit(50000.0), execution_type: Partial, crypto_currency_id: 1, currency_id: 1, quantity: 1.0, timestamp: SystemTime { intervals: 133649241282892932 }, status: Open }
2024-07-08T14:55:29.288436Z  INFO ThreadId(02) src\matcher.rs:157: After Matching
2024-07-08T14:55:29.288492Z  INFO ThreadId(02) src\matcher.rs:163: Sell Order: Order { id: 2, account_id: 2, trade_type: Sell, price_type: Limit(50000.0), execution_type: Partial, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241282892932 }, status: Open }
2024-07-08T14:55:29.292973Z  INFO ThreadId(01) examples\trade-engine.rs:64: OrderMatch: Buy Order Id: 1 Sell Order Id: 2 Quantity: 0.5 Price: 50000
2024-07-08T14:55:29.312581Z  INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 1 Name: Alice Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.312779Z  INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 1 Symbol: USD Balance: 75000.00
2024-07-08T14:55:29.312920Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 1 Balance: 100000.00 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.313002Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 2 Balance: 75000.00 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:29.313127Z  INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 2 BTC Amount: 0.5
2024-07-08T14:55:29.313255Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 2 Quantity: 0.5 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:29.313335Z  INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 2 Name: Bob Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.313454Z  INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 2 Symbol: USD Balance: 25000.00
2024-07-08T14:55:29.313579Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 3 Balance: 25000.00 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:29.313707Z  INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 1 BTC Amount: 0.5
2024-07-08T14:55:29.313814Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 1 Quantity: 1 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:29.313881Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 3 Quantity: 0.5 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.288721Z  INFO ThreadId(02) src\matcher.rs:157: Before Matching
2024-07-08T14:55:30.288946Z  INFO ThreadId(02) src\matcher.rs:160: Buy Order: Order { id: 3, account_id: 1, trade_type: Buy, price_type: Market, execution_type: Full, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241293139637 }, status: Open }
2024-07-08T14:55:30.289023Z  INFO ThreadId(02) src\matcher.rs:163: Sell Order: Order { id: 2, account_id: 2, trade_type: Sell, price_type: Limit(50000.0), execution_type: Partial, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241282892932 }, status: Open }
2024-07-08T14:55:30.315944Z  INFO ThreadId(01) examples\trade-engine.rs:64: OrderMatch: Buy Order Id: 3 Sell Order Id: 2 Quantity: 0.5 Price: 50000
2024-07-08T14:55:30.334073Z  INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 1 Name: Alice Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:30.334267Z  INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 1 Symbol: USD Balance: 50000.00
2024-07-08T14:55:30.334448Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 1 Balance: 100000.00 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:30.334571Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 2 Balance: 75000.00 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.334686Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 4 Balance: 50000.00 Timestamp: 2024-07-08 19:55:30
2024-07-08T14:55:30.334863Z  INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 2 BTC Amount: 1
2024-07-08T14:55:30.335030Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 2 Quantity: 0.5 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.335130Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 4 Quantity: 1 Timestamp: 2024-07-08 19:55:30
2024-07-08T14:55:30.335217Z  INFO ThreadId(01) examples\trade-engine.rs:81: AccountId: 2 Name: Bob Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:30.335340Z  INFO ThreadId(01) examples\trade-engine.rs:86: CurrencyId: 2 Symbol: USD Balance: 50000.00
2024-07-08T14:55:30.335495Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 3 Balance: 25000.00 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.335588Z  INFO ThreadId(01) examples\trade-engine.rs:92: CurrencyHistoryId: 5 Balance: 50000.00 Timestamp: 2024-07-08 19:55:30
2024-07-08T14:55:30.335720Z  INFO ThreadId(01) examples\trade-engine.rs:98: CryptoCurrencyId: 1 BTC Amount: 0
2024-07-08T14:55:30.335846Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 1 Quantity: 1 Timestamp: 2024-07-08 19:55:28
2024-07-08T14:55:30.335923Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 3 Quantity: 0.5 Timestamp: 2024-07-08 19:55:29
2024-07-08T14:55:30.335993Z  INFO ThreadId(01) examples\trade-engine.rs:104: CryptoCurrencyHistoryId: 5 Quantity: 0 Timestamp: 2024-07-08 19:55:30
2024-07-08T14:55:31.289822Z  INFO ThreadId(02) src\matcher.rs:157: Before Matching
2024-07-08T14:55:31.290319Z  INFO ThreadId(02) src\matcher.rs:160: Buy Order: Order { id: 4, account_id: 1, trade_type: Buy, price_type: Market, execution_type: Full, crypto_currency_id: 1, currency_id: 1, quantity: 0.5, timestamp: SystemTime { intervals: 133649241303360887 }, status: Open }
2024-07-08T14:55:31.290590Z  INFO ThreadId(02) src\matcher.rs:157: After Matching

Library features:

  • AccountSystem: accounts, currencies, cryptocurrencies, history

  • AssetSystem: currencies, cryptocurrencies

  • OrderSystem: orders, history

  • MatcherSystem: full or partial order matching (only for Market and Sell limit purchase)

  • StorageSystem: redb fast, ACID, built-in key-value storage

TODO

  • MatcherSystem: full or partial matching (for all order types)

  • StorageSystem: sharding, distributed transactions, distributed storage

The source code of the project is located at GitHub. If you want to help me with the development, write to me in Telegram @ievkz or Discord @igumnovnsk.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *