Asynchronous Rust. Part One: Futures
Asynchronous Rust in three parts
In the introduction we looked at example of asynchronous Rust without any explanation of how it works. This gave us a few questions to think about: What are asynchronous functions and the “future” they return? What does join_all? How is it different? tokio::time::sleep from std::thread::sleep?
To answer these questions, we will need to convert each of the parts into regular, non-asynchronous Rust code. We will soon discover that we can reproduce foo
And join_all
quite simple, but with sleep
the situation is a little more complicated. Let's get started.
Foo
Let me remind you that the asynchronous function foo
looked like this:
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
And this is what it looks like foo
in the format of a regular, non-asynchronous function. Below is the code for the entire function, after which we will break it into parts and analyze each of them. This is a complete replacement of the previous function, function main
no change required. You can complete it by going to Playground.
fn foo(n: u64) -> Foo {
let started = false;
let duration = Duration::from_secs(1);
let sleep = Box::pin(tokio::time::sleep(duration));
Foo { n, started, sleep }
}
struct Foo {
n: u64,
started: bool,
sleep: Pin<Box<tokio::time::Sleep>>,
}
impl Future for Foo {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if !self.started {
println!("start {}", self.n);
self.started = true;
}
if self.sleep.as_mut().poll(context).is_pending() {
return Poll::Pending;
}
println!("end {}", self.n);
Poll::Ready(())
}
}
Let's go from top to bottom. Function foo
is a regular function and returns a structure Foo
. She calls tokio::time::sleepbut does not apply .await
to the future Sleepreturned sleep
. Instead, the future is stored in the structure Foo
. We'll talk about Box::pin And Pin
Foo
becomes future due to the implementation of the trait Future. Below is the trait code from the standard library:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
The trait itself is a couple of lines of code, but contains 3 new data types: Pin, Context And Poll. We will focus on Poll
so first a few words about Context
And Pin
after which we’ll leave them for later.
Every call Future::poll
receives Context
from the calling function. When one function poll
calls another, for example when Foo::poll
causes Sleep::poll
she passes it to her Context
. For now, that's all we need to know until we get to the Wake section below.
Pin
This is a wrapper used to wrap pointers. For now, if you allow me, I will pretend that Pin
does nothing. I'll pretend that Box::pin
it's simple Box::new
, Pin<Box<T>>
This Box<T>
, Pin<&mut T>
This &mut T
A Pin<Box<T>>::as_mut
it's simple Box::as_mut. Pin
actually solves a very important problem in asynchronous Rust, but it will make more sense once we practice writing futures. We'll come back to this in the Pin section.
So let's focus on Poll
. This enum
and it looks like this:
pub enum Poll<T> {
Ready(T),
Pending,
}
First function task poll
– return or Poll::Ready
or Poll::Pending
. Return Ready
means the future has finished running and includes a value Output
if it exists. In that case poll
will not be called again. Return Pending
means that the future has not completed its work and poll
will be called again.
You might have a question: and when will it be called again? In short, you need to prepare for any situation. The poll function can be called over and over again in a “loaded loop” and we need it to work correctly. The long answer to this question will be in the Wake section.
Let's take a look at the implementation of the trait Future
at Foo
and function poll
. Code:
impl Future for Foo {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if !self.started {
println!("start {}", self.n);
self.started = true;
}
if self.sleep.as_mut().poll(context).is_pending() {
return Poll::Pending;
}
println!("end {}", self.n);
Poll::Ready(())
}
}
We have seen that the first task poll
there was a return Ready
or Pending
and now we can see that poll
There is a second task, namely the future task itself. Task Foo
— displaying several messages on the screen and going to sleep, which is what happens in the function poll
.
There is an important trade-off here: poll
should do whatever can be done quickly, but should not force the calling function to wait for a response. She must return it immediately either Ready
or Pending
. This compromise allows you to process more than one future at a time. This allows them to work without blocking each other.
To follow this rule, Foo::poll
have to rely on the fact that Sleep::poll
will quickly return the answer. If we add time tracking and conclusion, we can track what is happening. IN “important mistake“, which we did in the introduction, thread::sleep
violated this rule, which led to the sequential execution of our futures. If we let's do the same in Foo::pollwe will see the same result. Using blocking sleep in poll
causes the calling function to wait for a response, blocking all other futures.
Foo
uses a flag started
to display the start message only once, regardless of how many times the function is called poll
. Flag ended
at the same time is not required, because poll
will not be called again after returning Ready
. Flag started
turns Foo
into a state machine with two possible states. In general, an asynchronous function requires some kind of initial state, as well as another state for each of the points .await
to function poll
could determine from what point to “continue execution.” If we had more than two states, we could use enum
instead of bool
. When writing async fn
the compiler does this for us and such convenience is the main reason for having async/await
as features of the language.
Join
Now that we've figured out how to implement a simple future, let's take a look at join_all. It may seem that join_all
uses some kind of magic behind the scenes compared to foo
but in fact it turns out that we have everything we need to implement it. Here join_all
as a regular non-asynchronous function:
fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
JoinAll {
futures: futures.into_iter().map(Box::pin).collect(),
}
}
struct JoinAll<F> {
futures: Vec<Pin<Box<F>>>,
}
impl<F: Future> Future for JoinAll<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
let is_pending = |future: &mut Pin<Box<F>>| {
future.as_mut().poll(context).is_pending()
};
self.futures.retain_mut(is_pending);
if self.futures.is_empty() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
Again, ours is not asynchronous join_all
returns a structure that implements the trait Future
where the function operates in Future::poll
. Here again there is Box::pin
but let's continue to ignore it.
Inside poll
does all the hard work Vec::retain_mut. This is the usual method Vec
which takes a closure as an argument, calls it for each element, and strips the elements returning false
. This removes the future returned Ready
following the “don't call them again” rule.
There's nothing new here. From the outside, the launch of all futures at the same time looked like magic, but in fact, everything that happens inside is a challenge poll
for all elements Vec
. This is the flip side of the trade-off mentioned earlier. If we can trust that each call to poll will return a response quickly, we can process many futures in one loop.
Note that there is a shortcut here – we ignore the return value of the called futures. In this case it works because we use join_all
only with foo
which does not return anything. Real join_all
returns Vec<F::Output>
which requires more steps to track state. Let's leave this as an exercise for the reader, as they say
Sleep
We are on the right track! It seems we already have everything to realize our own sleep
:
fn sleep(duration: Duration) -> Sleep {
let wake_time = Instant::now() + duration;
Sleep { wake_time }
}
struct Sleep {
wake_time: Instant,
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
Hm. The code compiles without errors, and the logic is in the function poll
looks correct, but when started it displays a “start” message and then freezes. If you add additional output statementsyou can see that everyone Sleep
polled once at the beginning and never again. What are we missing?
It turns out that poll
three tasksand so far we have only dealt with two. Firstly, poll
does all the possible work without blocking. Then, poll
returns Ready
if it is completed, or Pending
if not. But finally, every time poll
going to return Pending
he needs to “schedule to wake up.” Ah, that's what we forgot.
The reason we haven't encountered this before is because Foo
And JoinAll
return Pending
only when the other “future” has already returned to them Pending
which means the awakening is already scheduled. But Sleep
– this is what we call “leaf future”. He has no other futures lower in the hierarchy, and he needs to awaken himself.
Wake
It's time to take a closer look Context. If we call context.waker()
he will return Waker. Calling any of the methods waker.wake() or waker.wake_by_ref() the function may ask to poll itself again. These two methods do the same thing, and we will use the one that is more convenient.
The simplest thing we can try is to request a re-survey upon return Pending
every time we return Pending
:
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
context.waker().wake_by_ref();
Poll::Pending
}
}
This code outputs the correct result and exits at the correct time, so the “infinite sleep” problem is solved, but we replace it with the “busy loop” problem. This code calls poll
over and over again as fast as it can, while burning through 100% of the CPU until it's finished. We can see it indirectly, counting the number of poll calls or we can measure directly using instruments, for example perf in Linux.
We need to call Waker
later when it's time to wake up, but we can't use thread::sleep
V poll
. One possible solution is to start another thread that will do thread::sleep
for us and then call wake
. If we did this in every pol calll, we would run into the problem of too many threads discussed in the introduction. But we could work around it by starting a shared stream and using a channel to pass Wakers into it. This is indeed a viable implementation, but there are some things wrong with it. The main thread of our program already spends most of its time in a sleep state. Why do we need two sleeping threads? Why is there no way to wake up our main thread at a specific time?
Well, to be fair, there is such a way – that’s what it exists for. tokio::time::sleep
. But if we really want to write our own sleep
and don't want to create an additional thread for it, then we also need to write our own main
.
Main
To call poll
from main
we will need Context
to transfer to it. We can create it using Context::from_wakerbut for this we need Waker
. There are several ways to create one, but for now we just need a stub, so we use a helper function called noop_waker (“Noop,” “no‑op,” and “nop” are abbreviations for “no operation”). Once we create Context
we will be able to call poll
in a loop:
fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
let mut joined_future = Box::pin(future::join_all(futures));
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
while joined_future.as_mut().poll(&mut context).is_pending() {
// Busy loop!
}
}
Works! Although we still have the “busy loop” problem as above. But before we solve it, we need to make one more important mistake:
Because this version of our main loop never stops polling, and because our Waker
does nothing, the question may arise: is it necessary to call Waker
V Sleep::poll
? Surprisingly, this is really necessary. If you remove this line, the code will work fine at first. But if you increase the number of tasks from ten to one hundred, our “futures” will never “wake up”. We see that although our Waker
does nothing, there are others in the program Waker
. When futures::future::JoinAll has many child “futures” (in futures v0.3.30 the exact threshold is 31), it creates its own Waker
in order to avoid repeatedly polling children that did not request to be “wake up”. This is more efficient than polling them all every time, but also means that children that never call their own Waker
will no longer be polled. This is the reason why “future”, being in a state Pending
should always arrange to call his Waker
.
Okay, back to main
. Let's fix the “busy loop” problem. We need to main
used thread::sleep
until the next wake up time, which means we need a way to Sleep::poll
conveyed Waker
and wake up time main
. We'll use a global variable and wrap it in Mutex
so that secure code can modify it.
static WAKE_TIMES: Mutex<BTreeMap<Instant, Vec<Waker>>> =
Mutex::new(BTreeMap::new());
It's sorted map
from time of awakening to Waker
. Note that the value type here is Vec<Waker>
not just Waker
because for this time Instant
maybe several Waker
. This is unlikely on Linux and macOS, where the resolution Instant::now()
measured in nanoseconds, but on Windows the resolution is 15.6 milliseconds.
Sleep::poll
can insert his Waker
this map
using BTreeMap::entry:
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
let mut wake_times = WAKE_TIMES.lock().unwrap();
let wakers_vec = wake_times.entry(self.wake_time).or_default();
wakers_vec.push(context.waker().clone());
Poll::Pending
}
}
After polling, our main loop can read the first key from map
to get the nearest wake-up time. He can then execute thread::sleep
before this time, eliminating the “busy loop” problem. Next he calls everything Waker
whose wake-up time has arrived, before re-running the loop and polling again:
fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
let mut joined_future = Box::pin(future::join_all(futures));
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
while joined_future.as_mut().poll(&mut context).is_pending() {
// The joined future is Pending. Sleep until the next wake time.
let mut wake_times = WAKE_TIMES.lock().unwrap();
let next_wake = wake_times.keys().next().expect("sleep forever?");
thread::sleep(next_wake.saturating_duration_since(Instant::now()));
// We just woke up. Invoke all the Wakers whose time has come.
while let Some(entry) = wake_times.first_entry() {
if *entry.key() <= Instant::now() {
entry.remove().into_iter().for_each(Waker::wake);
} else {
break;
}
}
// Loop and poll again.
}
}
Works! We solved the “busy loop” problem and did not need to create additional threads. This is what you need to write your own sleep
.
This is a kind of completion of the first part. In Part 2, we'll extend our main loop to implement “tasks”. Now that we understand how “futures” work, we have the opportunity to cover a few additional topics that will be useful for the following parts, although they are not necessary for understanding them.
Bonus: Pin
Now that we know how to convert async fn
into the structure Future
we can tell you a little more about Pin
and the problem it solves. Let's imagine that our async fn foo
for some reason it uses a link inside:
async fn foo(n: u64) {
let n_ref = &n;
println!("start {n_ref}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n_ref}");
}
This code compiles and runs fine, and looks like pretty normal Rust code. But what would the same change look like in our future structure? Foo
?
struct Foo {
n: u64,
n_ref: &u64,
started: bool,
sleep: Pin<Box<tokio::time::Sleep>>,
}
The code doesn't compile:
error[E0106]: missing lifetime specifier
--> src/main.rs:3:12
|
3 | n_ref: &u64,
| ^ expected named lifetime parameter
What should life be like? n_ref
? The short answer: there is no good answer. Self-referential borrowing is generally not allowed in Rust structures, and for what it's trying to do n_ref
no syntax. If there were, we would have to answer some difficult questions about when we can change n
and when can we move Foo
.
But then, what type Future
generated a compiler for async fn foo
higher? It turns out that Rust does some unsafe things inside to eliminate unexpressed lifetimes, as in the case of n_ref
. Pointer wrapper task Pin
is to encapsulate this insecurity so that we can write custom futures like JoinAll
in safe code. Structure Pin
works with auto trait Unpinwhich is implemented for most concrete types, but not for compiler-generated futures returned async
functions. Operations that can allow us to move fixed objects are either limited Unpin
(How DerefMut), or marked as unsafe
(For example get_unchecked_mut). It turns out that our widespread use Box::pin
in the examples above meant that all our futures were automatically Unpin
So DerefMut
worked for our links Pin<&mut Self>
and we could change fields like self.started
And self.futures
without thinking about it.
This is where we will end our discussion of Pin, since detailed details are not needed for tasks (part two) or I/O (part three). But if you want to know all the details, start with this post from the author Pinand then read official Pin documentation.
Bonus: Cancel
Asynchronous functions look and feel like regular functions, but with special superpowers. There is one more superpower that we haven't mentioned.
This superpower is cancellation. When we call a regular function in blocking code, we don't have any way to cancel the call after a while. But we can cancel any future simply… without calling it again. For this purpose in tokio
There is tokio::time::timeoutand we also have everything you need to make your own version:
struct Timeout<F> {
sleep: Pin<Box<tokio::time::Sleep>>,
inner: Pin<Box<F>>,
}
impl<F: Future> Future for Timeout<F> {
type Output = Option<F::Output>;
fn poll(
mut self: Pin<&mut Self>,
context: &mut Context,
) -> Poll<Self::Output> {
// Check whether the inner future is finished.
if let Poll::Ready(output) = self.inner.as_mut().poll(context) {
return Poll::Ready(Some(output));
}
// Check whether time is up.
if self.sleep.as_mut().poll(context).is_ready() {
return Poll::Ready(None);
}
// Still waiting.
Poll::Pending
}
}
fn timeout<F: Future>(duration: Duration, inner: F) -> Timeout<F> {
Timeout {
sleep: Box::pin(tokio::time::sleep(duration)),
inner: Box::pin(inner),
}
}
This wrapper works with any asynchronous function. Regular functions do not have such functionality.
Bonus: Recursion
The only superpower we're missing is recursion. When trying to call itself in an asynchronous function:
async fn factorial(n: u64) -> u64 {
if n == 0 {
1
} else {
n * factorial(n - 1).await
}
}
We will get a compilation error:
error[E0733]: recursion in an async fn requires boxing
--> recursion.rs:1:1
|
1 | async fn factorial(n: u64) -> u64 {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
…
5 | n * factorial(n - 1).await
| ---------------------- recursive call here
|
= note: a recursive `async fn` must introduce indirection such as `Box::pin` to avoid an infinitely sized future
When regular functions call each other, they dynamically allocate memory on the call stack. But when asynchronous functions do .await
each other, they are compiled into structures containing other structures whose dimensions are static. If an asynchronous function calls itself, it has to put a recurrent future in Box
before the call .await
:
async fn factorial(n: u64) -> u64 {
if n == 0 {
1
} else {
let recurse = Box::pin(factorial(n - 1));
n * recurse.await
}
}
This works, but requires memory allocation on the heap.
So, let's finish this and move on to the “tasks”.