Asynchronous Rust in three parts. Part two: Tasks

Asynchronous Rust in three parts

In the introduction we said that async/await it's about futures and tasks. In the first part we looked at futures and now it’s time for tasks. Fortunately, we have already met them, although we never called them that. The latest version of our main loop in Part 1 looked like this:

let mut joined_future = Box::pin(future::join_all(futures));
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
while joined_future.as_mut().poll(&mut context).is_pending() {
    …
}

Playground #1

Shown above joined_future This is the simplest example of a problem. This is the top-level future that is polled by the main loop. Here we have only one task, but nothing prevents us from adding more. If we had a collection of tasks, we could add it at runtime.

This is what it does tokio::task::spawn. We can rewrite ours original Tokio example using spawn instead of join_all:

#[tokio::main]
async fn main() {
    let mut task_handles = Vec::new();
    for n in 1..=10 {
        task_handles.push(tokio::task::spawn(foo(n)));
    }
    for handle in task_handles {
        handle.await.unwrap();
    }
}

Playground #2

foo it's still async fnbut otherwise it's very similar to our initial example thread::spawn. Like threads, but not like regular futures, tasks begin executing in the background immediately after being called spawnso application .await to task handle works like join to the handle of the thread. Network services often use a main loop approach that listens for new connections and creates new threads to handle each one. Asynchronous tasks allow us to use the same approach without the thread overhead. This is what we will do in the third part.

Using main loop from the first part we will write our own spawn. This will happen in three stages: first we will allocate space for several tasks in the main loop, then we will write a function spawn to add new tasks and finally implement JoinHandle.

Dyn

We already know how to poll multiple futures at once, since that's what we did with implementations of JoinAll. What can we copy and paste?

One of the things that needs to change is the type Vec for futures. Our JoinAll used Vec<Pin<Box<F>>>Where F is a generic type parameter, but our main function does not have any type parameters. We also want the new vector to be able to contain futures of different types at the same time. The Rust feature we need in this case is dynamic trait objectsdyn Trait. Let's start with a type alias so as not to write it several times:

type DynFuture = Pin<Box<dyn Future<Output = ()>>>;

Please note that DynFuture no type parameters. We can place any future, packaged in Boxuntil Output(). Now, instead of creating join_future in function main we will create Vec<DynFuture> and let's start calling these futures tasks:

fn main() {
    let mut tasks: Vec<DynFuture> = Vec::new();
    for n in 1..=10 {
        tasks.push(Box::pin(foo(n)));
    }
    let waker = futures::task::noop_waker();
    let mut context = Context::from_waker(&waker);
    …

Playground #3

We manage Vec<DynFuture> by using retain_muthow I did it JoinAllremoving futures from Vecas soon as they return Ready. We need to change the cycle while on loop/breakto poll, check if we are ready, and then process Waker'ov. Now it looks like this:

 let waker = futures::task::noop_waker();
    let mut context = Context::from_waker(&waker);
    loop {
        // Poll each task, removing any that are Ready.
        let is_pending = |task: &mut DynFuture| {
            task.as_mut().poll(&mut context).is_pending()
        };
        tasks.retain_mut(is_pending);

        // If there are no tasks left, we're done.
        if tasks.is_empty() {
            break;
        }

        // Otherwise handle WAKE_TIMES and sleep as in Part One...
        …

Playground #3

It works fine, although it doesn't feel like much of an achievement. For the most part we just copy-pasted the code from JoinAll and corrected the types. But with this we have laid an important foundation.

Please note that the behavior of this loop is slightly different from how tasks work in Tokio. Rust usually exits when the main flow finished work without waiting for background processes to complete, just like Tokio shuts down when the main one task completed without waiting for background tasks. However, our version of the main loop continues to work for now All tasks will not be completed. It also assumes that tasks have no return value. We'll fix those two things when we get to JoinHandlebut first let's do spawn.

Spawn

Function spawn should add new futures to Vec tasks. How to implement access to Vec? It would be convenient if we could do what we did with WAKE_TIMES and do TASKS global variable protected Mutexbut this time it won't work. Our main loop hangs the lock on WAKE_TIMES upon completion of the survey, but if we do TASKS global, then the main loop will hang the lock in progress survey and any task that causes spawn will get into deadlock.

We'll get around this by creating two different lists. We'll leave tasks in its place – as a local variable of the main loop, and also add a global list NEW_TASKS. Function spawn will add tasks to NEW_TASKS:

static NEW_TASKS: Mutex<Vec<DynFuture>> = Mutex::new(Vec::new());

fn spawn<F: Future<Output = ()>>(future: F) {
    NEW_TASKS.lock().unwrap().push(Box::pin(future));
}

Playground #4

Now the main loop can… wait a minute, it doesn't compile:

error[E0277]: `(dyn Future<Output = ()> + 'static)` cannot be sent between threads safely
    --> tasks_no_send_no_static.rs:43:19
     |
43   | static NEW_TASKS: Mutex<Vec<DynFuture>> = Mutex::new(Vec::new());
     |                   ^^^^^^^^^^^^^^^^^^^^^ `(dyn Future<Output = ()> + 'static)` cannot be sent between threads
     |
     = help: the trait `Send` is not implemented for `(dyn Future<Output = ()> + 'static)`, which is required by
             `Mutex<Vec<Pin<Box<(dyn Future<Output = ()> + 'static)>>>>: Sync`

Global variables in Rust should be SyncA Mutex is Sync only when T is Send. DynFuture must promise that he will implement Send:

type DynFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

So, now… nope, still not going:

error[E0277]: `F` cannot be sent between threads safely
  --> src/main.rs:46:36
   |
46 |     NEW_TASKS.lock().unwrap().push(Box::pin(future));
   |                                    ^^^^^^^^^^^^^^^^ `F` cannot be sent between threads safely
   |
   = note: required for the cast from `Pin<Box<F>>` to
           `Pin<Box<(dyn futures::Future<Output = ()> + std::marker::Send + 'static)>>`

Playground #5

Fair, spawn should do the same:

fn spawn<F: Future<Output = ()> + Send>(future: F) { … }

Well, are you satisfied? Nope:

error[E0310]: the parameter type `F` may not live long enough
  --> src/main.rs:46:36
   |
46 |     NEW_TASKS.lock().unwrap().push(Box::pin(future));
   |                                    ^^^^^^^^^^^^^^^^
   |                                    |
   |                                    the parameter type `F` must be valid for the static lifetime...
   |                                    ...so that the type `F` will meet its required lifetime bounds

Playground #6

Global variables have a lifetime 'staticwhich means they cannot hold pointers to anything that can be deallocated. Trait objects such as DynFuture default 'staticbut the types of parameters are such as F – No. If spawn wants to place F into a global variable, it must promise that F'static:

fn spawn<F: Future<Output = ()> + Send + 'static>(future: F) { … }

Finally it compiles. A lot of body movements just to create a global Veclet's think about what exactly we created: instead of “Vec with futures”, NEW_TASKS This “Vec thread-safe futures that do not contain potentially dangling pointers.” Rust doesn't have a garbage collector, so dangling pointers would lead to memory corruption bugs, and being able to say we don't want that is not a bad thing.

So…Now the main loop can move tasks from NEW_TASKS V tasks. It doesn't require a lot of code, but there are a couple of things to consider, and this time they're runtime bugs instead of compilation errors. First of all, we need to poll new tasks before the next iteration of the main loop so that they have a chance to request a wakeup call before going to sleep. We also need to make sure that NEW_TASKS unlocked before the survey, otherwise we will again have the deadlock we were trying to avoid. Here's the extended main loop:

loop {
    // Poll each task, removing any that are Ready.
    let is_pending = |task: &mut DynFuture| {
        task.as_mut().poll(&mut context).is_pending()
    };
    tasks.retain_mut(is_pending);

    // Collect new tasks, poll them, and keep the ones that are Pending.
    loop {
        let Some(mut task) = NEW_TASKS.lock().unwrap().pop() else {
            break;
        };
        // Polling this task could spawn more tasks, so it's important that
        // NEW_TASKS isn't locked here.
        if task.as_mut().poll(&mut context).is_pending() {
            tasks.push(task);
        }
    }

    // If there are no tasks left, we're done.
    if tasks.is_empty() {
        break;
    }

    // Otherwise handle WAKE_TIMES and sleep as in Part One...
    …

Playground #7

Having prepared the ground, we can define the function async_main and give her the reins of launching tasks instead of hardcoding the task list in main:

async fn async_main() {
    // The main loop currently waits for all tasks to finish.
    for n in 1..=10 {
        spawn(foo(n));
    }
}

fn main() {
    let waker = futures::task::noop_waker();
    let mut context = Context::from_waker(&waker);
    let mut tasks: Vec<DynFuture> = vec![Box::pin(async_main())];
    …

Playground #7

It works! Because of the way we add and remove tasks to and from NEW_TASKSthe output order is different. We could fix this, but let's leave it that way. This is a good reminder that, like threads, concurrent tasks can be executed in any order.

JoinHandle

As we mentioned earlier, Tokio supports background tasks that do not block program completion, and there is also support for tasks to return values. Both features require tokio::task::spawn return tokio::task::JoinHandle just like thread::spawn returns thread::JoinHandle. In order to also have this functionality, we implement our own JoinHandle. Also, since before we only encountered blocking in the context sleepwe will get acquainted with a new form of blocking and an unusual bug stemming from it.

JoinHandle must communicate between two tasks: one in the process of completion, the other waiting for the first to complete. The waiting task needs to put its Wakerso that the final task can execute it, and the final task, in turn, needs to pass the return value somewhere Tso that the waiting task can receive it. We don't need them at the same time, so we can use enum. It should be shared and mutable, so let's wrap it in Arc And Mutex:

enum JoinState<T> {
    Unawaited,
    Awaited(Waker),
    Ready(T),
    Done,
}

struct JoinHandle<T> {
    state: Arc<Mutex<JoinState<T>>>,
}

Playground #8

Waiting for the task to complete will occur at the cost of waiting JoinHandleso the latter in turn needs to implement Future. The catch is that the waiting thread wants to own what was passed in JoinState::Ready(T) meaning TBut Arc<Mutex<JoinState>> allows you to access JoinState only by reference, so we can't move T and “leave a hole” where the link leads. Instead we will replace the entire JoinStateusing mem::replace:

impl<T> Future for JoinHandle<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<T> {
        let mut guard = self.state.lock().unwrap();
        // Use JoinState::Done as a placeholder, to take ownership of T.
        match mem::replace(&mut *guard, JoinState::Done) {
            JoinState::Ready(value) => Poll::Ready(value),
            JoinState::Unawaited | JoinState::Awaited(_) => {
                // Replace the previous Waker, if any.
                *guard = JoinState::Awaited(context.waker().clone());
                Poll::Pending
            }
            JoinState::Done => unreachable!("polled again after Ready"),
        }
    }
}

Playground #8

Transmitted spawn future don't know anything about JoinStateso we will need a wrapper to handle the return values ​​and run Waker if available:

async fn wrap_with_join_state<F: Future>(
    future: F,
    join_state: Arc<Mutex<JoinState<F::Output>>>,
) {
    let value = future.await;
    let mut guard = join_state.lock().unwrap();
    if let JoinState::Awaited(waker) = &*guard {
        waker.wake_by_ref();
    }
    *guard = JoinState::Ready(value)
}

Playground #8

Now we can create JoinState and use our wrapper in spawnto accept any type of output and return JoinHandle:

fn spawn<F, T>(future: F) -> JoinHandle<T>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    let join_state = Arc::new(Mutex::new(JoinState::Unawaited));
    let join_handle = JoinHandle {
        state: Arc::clone(&join_state),
    };
    let task = Box::pin(wrap_with_join_state(future, join_state));
    NEW_TASKS.lock().unwrap().push(task);
    join_handle
}

Playground #8

We will collect and apply .await To JoinHandle'am in async_mainsimilar to how we handled Tokio tasks earlier:

async fn async_main() {
    let mut task_handles = Vec::new();
    for n in 1..=10 {
        task_handles.push(spawn(foo(n)));
    }
    for handle in task_handles {
        handle.await;
    }
}

Playground #8

Now that we can explicitly wait for a task, we would like the main loop to terminate once the main task completes. Let's separate the main task from tasks and rename the task list to other_tasks:

fn main() {
    let waker = futures::task::noop_waker();
    let mut context = Context::from_waker(&waker);
    let mut main_task = Box::pin(async_main());
    let mut other_tasks: Vec<DynFuture> = Vec::new();
    loop {
        // Poll the main task and exit immediately if it's done.
        if main_task.as_mut().poll(&mut context).is_ready() {
            return;
        }
        // Poll other tasks and remove any that are Ready.
        let is_pending = |task: &mut DynFuture| {
            task.as_mut().poll(&mut context).is_pending()
        };
        other_tasks.retain_mut(is_pending);
        // Handle NEW_TASKS and WAKE_TIMES...

Playground #8

Ready! We made a lot of changes at once and, fortunately, it's coming together. Even almost works. The program displays the correct text, but then panics:

…
end 3
end 2
end 1
thread 'main' panicked at src/main.rs:143:50:
sleep forever?

Playground #8

This is the very interesting bug that we were waiting for.

Waker

Panic occurs on this line, which was in the main loop from the first part:

let next_wake = wake_times.keys().next().expect("sleep forever?");

Playground #8

The cycle is about to go into sleepso it asks for the next wake-up time, but the tree WAKE_TIMES empty. Previously, we could assume that if any of the tasks returned Pendingthen there must be at least one wake-up time, since the only source of blocking was Sleep. But now we have a second source: JoinHandle. If JoinHandle V Pendingthis could be because another task is sleeping and has set a wake-up time. But this can also happen when another task is ready to return Readybut we haven't interviewed her yet. It depends a lot on order tasks in the list. If a task at the beginning of the list is waiting for a task at the end, we may end up with tasks at Pending and without scheduled awakenings.

This is exactly what happened. The main task is most likely blocked at the first JoinHandle. The main loop wakes up, polls the main task and this JoinHandle still in Pending. After which it queries all tasks from other_tasks. Each of them displays a completion message, sends a signal to its JoinHandle and returns Ready. At this point we need to poll the main task instead of trying to sleep. How do we pass this to the main loop? We could create another one static-flag, but there is a better option. We use our Waker.

From the first part we used futures::task::noop_waker to send empty Waker. When Sleep was the only source of blocking, our tasks had no way to unlock other tasks and everything we needed from Waker — a stub so that the code compiles. But the situation has changed. Our function wrap_with_join_state already calling Waker's correctly upon completion of tasks and it would be nice to know when this happens. Why not write your own Waker?

Waker implements the trait From<Arc<W>>Where W – any type with a trait Wakewhich in turn requires a method wake. This method takes Arc<Self>which is a little funny, but generally allows us to do whatever we want. The simplest option is to do something like Arc<Mutex<bool>> and do it true when a task receives a wakeup request. It's not much different from static‑flag, but allows other people's futures to call ours Waker without the need to know the implementation of the main loop. Here is ourbool“:

struct AwakeFlag(Mutex<bool>);

impl AwakeFlag {
    fn check_and_clear(&self) -> bool {
        let mut guard = self.0.lock().unwrap();
        let check = *guard;
        *guard = false;
        check
    }
}

impl Wake for AwakeFlag {
    fn wake(self: Arc<Self>) {
        *self.0.lock().unwrap() = true;
    }
}

Playground #9

We can create AwakeFlag and then from it Waker at the beginning main:

fn main() {
    let awake_flag = Arc::new(AwakeFlag(Mutex::new(false)));
    let waker = Waker::from(Arc::clone(&awake_flag));
    let mut context = Context::from_waker(&waker);
    …

Playground #9

And finally, we can add that same check to the main loop:

// Collect new tasks, poll them, and keep the ones that are Pending.
loop {
    let Some(mut task) = NEW_TASKS.lock().unwrap().pop() else {
        break;
    };
    if task.as_mut().poll(&mut context).is_pending() {
        other_tasks.push(task);
    }
}
// Some tasks might wake other tasks. Re-poll if the AwakeFlag has been
// set. Polling futures that aren't ready yet is inefficient but allowed.
if awake_flag.check_and_clear() {
    continue;
}
// Otherwise handle WAKERS and sleep as in Part One...

Playground #9

Works! We successfully completed the tasks.

It's time to move on and instead of sleeping and writing text to the screen, look at real I/O and use spawn to handle network connections.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *