Asynchronous Rust in three parts. Part two: Tasks
Asynchronous Rust in three parts
In the introduction we said that async/await
it's about futures and tasks. In the first part we looked at futures and now it’s time for tasks. Fortunately, we have already met them, although we never called them that. The latest version of our main loop in Part 1 looked like this:
let mut joined_future = Box::pin(future::join_all(futures));
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
while joined_future.as_mut().poll(&mut context).is_pending() {
…
}
Shown above joined_future
This is the simplest example of a problem. This is the top-level future that is polled by the main loop. Here we have only one task, but nothing prevents us from adding more. If we had a collection of tasks, we could add it at runtime.
This is what it does tokio::task::spawn. We can rewrite ours original Tokio example using spawn
instead of join_all
:
#[tokio::main]
async fn main() {
let mut task_handles = Vec::new();
for n in 1..=10 {
task_handles.push(tokio::task::spawn(foo(n)));
}
for handle in task_handles {
handle.await.unwrap();
}
}
foo
it's still async fn
but otherwise it's very similar to our initial example thread::spawn. Like threads, but not like regular futures, tasks begin executing in the background immediately after being called spawn
so application .await
to task handle works like join
to the handle of the thread. Network services often use a main loop approach that listens for new connections and creates new threads to handle each one. Asynchronous tasks allow us to use the same approach without the thread overhead. This is what we will do in the third part.
Using main loop from the first part we will write our own spawn
. This will happen in three stages: first we will allocate space for several tasks in the main loop, then we will write a function spawn
to add new tasks and finally implement JoinHandle.
Dyn
We already know how to poll multiple futures at once, since that's what we did with implementations of JoinAll. What can we copy and paste?
One of the things that needs to change is the type Vec
for futures. Our JoinAll
used Vec<Pin<Box<F>>>
Where F
is a generic type parameter, but our main function does not have any type parameters. We also want the new vector to be able to contain futures of different types at the same time. The Rust feature we need in this case is dynamic trait objects — dyn Trait
. Let's start with a type alias so as not to write it several times:
type DynFuture = Pin<Box<dyn Future<Output = ()>>>;
Please note that DynFuture
no type parameters. We can place any future, packaged in Box
until Output
— ()
. Now, instead of creating join_future
in function main
we will create Vec<DynFuture>
and let's start calling these futures tasks:
fn main() {
let mut tasks: Vec<DynFuture> = Vec::new();
for n in 1..=10 {
tasks.push(Box::pin(foo(n)));
}
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
…
We manage Vec<DynFuture>
by using retain_mut
how I did it JoinAll
removing futures from Vec
as soon as they return Ready
. We need to change the cycle while
on loop/break
to poll, check if we are ready, and then process Waker
'ov. Now it looks like this:
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
loop {
// Poll each task, removing any that are Ready.
let is_pending = |task: &mut DynFuture| {
task.as_mut().poll(&mut context).is_pending()
};
tasks.retain_mut(is_pending);
// If there are no tasks left, we're done.
if tasks.is_empty() {
break;
}
// Otherwise handle WAKE_TIMES and sleep as in Part One...
…
It works fine, although it doesn't feel like much of an achievement. For the most part we just copy-pasted the code from JoinAll
and corrected the types. But with this we have laid an important foundation.
Please note that the behavior of this loop is slightly different from how tasks work in Tokio. Rust usually exits when the main flow finished work without waiting for background processes to complete, just like Tokio shuts down when the main one task completed without waiting for background tasks. However, our version of the main loop continues to work for now All tasks will not be completed. It also assumes that tasks have no return value. We'll fix those two things when we get to JoinHandle
but first let's do spawn
.
Spawn
Function spawn
should add new futures to Vec
tasks. How to implement access to Vec
? It would be convenient if we could do what we did with WAKE_TIMES
and do TASKS
global variable protected Mutex
but this time it won't work. Our main loop hangs the lock on WAKE_TIMES
upon completion of the survey, but if we do TASKS
global, then the main loop will hang the lock in progress survey and any task that causes spawn
will get into deadlock.
We'll get around this by creating two different lists. We'll leave tasks
in its place – as a local variable of the main loop, and also add a global list NEW_TASKS
. Function spawn
will add tasks to NEW_TASKS
:
static NEW_TASKS: Mutex<Vec<DynFuture>> = Mutex::new(Vec::new());
fn spawn<F: Future<Output = ()>>(future: F) {
NEW_TASKS.lock().unwrap().push(Box::pin(future));
}
Now the main loop can… wait a minute, it doesn't compile:
error[E0277]: `(dyn Future<Output = ()> + 'static)` cannot be sent between threads safely
--> tasks_no_send_no_static.rs:43:19
|
43 | static NEW_TASKS: Mutex<Vec<DynFuture>> = Mutex::new(Vec::new());
| ^^^^^^^^^^^^^^^^^^^^^ `(dyn Future<Output = ()> + 'static)` cannot be sent between threads
|
= help: the trait `Send` is not implemented for `(dyn Future<Output = ()> + 'static)`, which is required by
`Mutex<Vec<Pin<Box<(dyn Future<Output = ()> + 'static)>>>>: Sync`
Global variables in Rust should be Sync
A MutexDynFuture
must promise that he will implement Send
:
type DynFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
So, now… nope, still not going:
error[E0277]: `F` cannot be sent between threads safely
--> src/main.rs:46:36
|
46 | NEW_TASKS.lock().unwrap().push(Box::pin(future));
| ^^^^^^^^^^^^^^^^ `F` cannot be sent between threads safely
|
= note: required for the cast from `Pin<Box<F>>` to
`Pin<Box<(dyn futures::Future<Output = ()> + std::marker::Send + 'static)>>`
Fair, spawn
should do the same:
fn spawn<F: Future<Output = ()> + Send>(future: F) { … }
Well, are you satisfied? Nope:
error[E0310]: the parameter type `F` may not live long enough
--> src/main.rs:46:36
|
46 | NEW_TASKS.lock().unwrap().push(Box::pin(future));
| ^^^^^^^^^^^^^^^^
| |
| the parameter type `F` must be valid for the static lifetime...
| ...so that the type `F` will meet its required lifetime bounds
Global variables have a lifetime 'static
which means they cannot hold pointers to anything that can be deallocated. Trait objects such as DynFuture
default 'static
but the types of parameters are such as F
– No. If spawn
wants to place F
into a global variable, it must promise that F
— 'static
:
fn spawn<F: Future<Output = ()> + Send + 'static>(future: F) { … }
Finally it compiles. A lot of body movements just to create a global Vec
let's think about what exactly we created: instead of “Vec
with futures”, NEW_TASKS
This “Vec
thread-safe futures that do not contain potentially dangling pointers.” Rust doesn't have a garbage collector, so dangling pointers would lead to memory corruption bugs, and being able to say we don't want that is not a bad thing.
So…Now the main loop can move tasks from NEW_TASKS
V tasks
. It doesn't require a lot of code, but there are a couple of things to consider, and this time they're runtime bugs instead of compilation errors. First of all, we need to poll new tasks before the next iteration of the main loop so that they have a chance to request a wakeup call before going to sleep. We also need to make sure that NEW_TASKS
unlocked before the survey, otherwise we will again have the deadlock we were trying to avoid. Here's the extended main loop:
loop {
// Poll each task, removing any that are Ready.
let is_pending = |task: &mut DynFuture| {
task.as_mut().poll(&mut context).is_pending()
};
tasks.retain_mut(is_pending);
// Collect new tasks, poll them, and keep the ones that are Pending.
loop {
let Some(mut task) = NEW_TASKS.lock().unwrap().pop() else {
break;
};
// Polling this task could spawn more tasks, so it's important that
// NEW_TASKS isn't locked here.
if task.as_mut().poll(&mut context).is_pending() {
tasks.push(task);
}
}
// If there are no tasks left, we're done.
if tasks.is_empty() {
break;
}
// Otherwise handle WAKE_TIMES and sleep as in Part One...
…
Having prepared the ground, we can define the function async_main
and give her the reins of launching tasks instead of hardcoding the task list in main
:
async fn async_main() {
// The main loop currently waits for all tasks to finish.
for n in 1..=10 {
spawn(foo(n));
}
}
fn main() {
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
let mut tasks: Vec<DynFuture> = vec![Box::pin(async_main())];
…
It works! Because of the way we add and remove tasks to and from NEW_TASKS
the output order is different. We could fix this, but let's leave it that way. This is a good reminder that, like threads, concurrent tasks can be executed in any order.
JoinHandle
As we mentioned earlier, Tokio supports background tasks that do not block program completion, and there is also support for tasks to return values. Both features require tokio::task::spawn return tokio::task::JoinHandle just like thread::spawn returns thread::JoinHandle. In order to also have this functionality, we implement our own JoinHandle
. Also, since before we only encountered blocking in the context sleep
we will get acquainted with a new form of blocking and an unusual bug stemming from it.
JoinHandle
must communicate between two tasks: one in the process of completion, the other waiting for the first to complete. The waiting task needs to put its Waker
so that the final task can execute it, and the final task, in turn, needs to pass the return value somewhere T
so that the waiting task can receive it. We don't need them at the same time, so we can use enum
. It should be shared and mutable, so let's wrap it in Arc
And Mutex
:
enum JoinState<T> {
Unawaited,
Awaited(Waker),
Ready(T),
Done,
}
struct JoinHandle<T> {
state: Arc<Mutex<JoinState<T>>>,
}
Waiting for the task to complete will occur at the cost of waiting JoinHandle
so the latter in turn needs to implement Future
. The catch is that the waiting thread wants to own what was passed in JoinState::Ready(T)
meaning T
But Arc<Mutex<JoinState>>
allows you to access JoinState
only by reference, so we can't move T
and “leave a hole” where the link leads. Instead we will replace the entire JoinState
using mem::replace:
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<T> {
let mut guard = self.state.lock().unwrap();
// Use JoinState::Done as a placeholder, to take ownership of T.
match mem::replace(&mut *guard, JoinState::Done) {
JoinState::Ready(value) => Poll::Ready(value),
JoinState::Unawaited | JoinState::Awaited(_) => {
// Replace the previous Waker, if any.
*guard = JoinState::Awaited(context.waker().clone());
Poll::Pending
}
JoinState::Done => unreachable!("polled again after Ready"),
}
}
}
Transmitted spawn
future don't know anything about JoinState
so we will need a wrapper to handle the return values and run Waker
if available:
async fn wrap_with_join_state<F: Future>(
future: F,
join_state: Arc<Mutex<JoinState<F::Output>>>,
) {
let value = future.await;
let mut guard = join_state.lock().unwrap();
if let JoinState::Awaited(waker) = &*guard {
waker.wake_by_ref();
}
*guard = JoinState::Ready(value)
}
Now we can create JoinState
and use our wrapper in spawn
to accept any type of output and return JoinHandle
:
fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let join_state = Arc::new(Mutex::new(JoinState::Unawaited));
let join_handle = JoinHandle {
state: Arc::clone(&join_state),
};
let task = Box::pin(wrap_with_join_state(future, join_state));
NEW_TASKS.lock().unwrap().push(task);
join_handle
}
We will collect and apply .await
To JoinHandle
'am in async_main
similar to how we handled Tokio tasks earlier:
async fn async_main() {
let mut task_handles = Vec::new();
for n in 1..=10 {
task_handles.push(spawn(foo(n)));
}
for handle in task_handles {
handle.await;
}
}
Now that we can explicitly wait for a task, we would like the main loop to terminate once the main task completes. Let's separate the main task from tasks
and rename the task list to other_tasks
:
fn main() {
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
let mut main_task = Box::pin(async_main());
let mut other_tasks: Vec<DynFuture> = Vec::new();
loop {
// Poll the main task and exit immediately if it's done.
if main_task.as_mut().poll(&mut context).is_ready() {
return;
}
// Poll other tasks and remove any that are Ready.
let is_pending = |task: &mut DynFuture| {
task.as_mut().poll(&mut context).is_pending()
};
other_tasks.retain_mut(is_pending);
// Handle NEW_TASKS and WAKE_TIMES...
Ready! We made a lot of changes at once and, fortunately, it's coming together. Even almost works. The program displays the correct text, but then panics:
…
end 3
end 2
end 1
thread 'main' panicked at src/main.rs:143:50:
sleep forever?
This is the very interesting bug that we were waiting for.
Waker
Panic occurs on this line, which was in the main loop from the first part:
let next_wake = wake_times.keys().next().expect("sleep forever?");
The cycle is about to go into sleep
so it asks for the next wake-up time, but the tree WAKE_TIMES
empty. Previously, we could assume that if any of the tasks returned Pending
then there must be at least one wake-up time, since the only source of blocking was Sleep
. But now we have a second source: JoinHandle
. If JoinHandle
V Pending
this could be because another task is sleeping and has set a wake-up time. But this can also happen when another task is ready to return Ready
but we haven't interviewed her yet. It depends a lot on order tasks in the list. If a task at the beginning of the list is waiting for a task at the end, we may end up with tasks at Pending
and without scheduled awakenings.
This is exactly what happened. The main task is most likely blocked at the first JoinHandle
. The main loop wakes up, polls the main task and this JoinHandle
still in Pending
. After which it queries all tasks from other_tasks
. Each of them displays a completion message, sends a signal to its JoinHandle
and returns Ready
. At this point we need to poll the main task instead of trying to sleep. How do we pass this to the main loop? We could create another one static
-flag, but there is a better option. We use our Waker
.
From the first part we used futures::task::noop_waker to send empty Waker
. When Sleep
was the only source of blocking, our tasks had no way to unlock other tasks and everything we needed from Waker
— a stub so that the code compiles. But the situation has changed. Our function wrap_with_join_state
already calling Waker
's correctly upon completion of tasks and it would be nice to know when this happens. Why not write your own Waker
?
Waker
implements the trait From<Arc<W>>
Where W
– any type with a trait Wakewhich in turn requires a method wake
. This method takes Arc<Self>
which is a little funny, but generally allows us to do whatever we want. The simplest option is to do something like Arc<Mutex<bool>>
and do it true
when a task receives a wakeup request. It's not much different from static
‑flag, but allows other people's futures to call ours Waker
without the need to know the implementation of the main loop. Here is ourbool
“:
struct AwakeFlag(Mutex<bool>);
impl AwakeFlag {
fn check_and_clear(&self) -> bool {
let mut guard = self.0.lock().unwrap();
let check = *guard;
*guard = false;
check
}
}
impl Wake for AwakeFlag {
fn wake(self: Arc<Self>) {
*self.0.lock().unwrap() = true;
}
}
We can create AwakeFlag
and then from it Waker
at the beginning main
:
fn main() {
let awake_flag = Arc::new(AwakeFlag(Mutex::new(false)));
let waker = Waker::from(Arc::clone(&awake_flag));
let mut context = Context::from_waker(&waker);
…
And finally, we can add that same check to the main loop:
// Collect new tasks, poll them, and keep the ones that are Pending.
loop {
let Some(mut task) = NEW_TASKS.lock().unwrap().pop() else {
break;
};
if task.as_mut().poll(&mut context).is_pending() {
other_tasks.push(task);
}
}
// Some tasks might wake other tasks. Re-poll if the AwakeFlag has been
// set. Polling futures that aren't ready yet is inefficient but allowed.
if awake_flag.check_and_clear() {
continue;
}
// Otherwise handle WAKERS and sleep as in Part One...
Works! We successfully completed the tasks.
It's time to move on and instead of sleeping and writing text to the screen, look at real I/O and use spawn
to handle network connections.