Asynchronous Rust. Part One: Futures

Asynchronous Rust in three parts

In the introduction we looked at example of asynchronous Rust without any explanation of how it works. This gave us a few questions to think about: What are asynchronous functions and the “future” they return? What does join_all? How is it different? tokio::time::sleep from std::thread::sleep?

To answer these questions, we will need to convert each of the parts into regular, non-asynchronous Rust code. We will soon discover that we can reproduce foo And join_all quite simple, but with sleep the situation is a little more complicated. Let's get started.

Foo

Let me remind you that the asynchronous function foo looked like this:

async fn foo(n: u64) {
    println!("start {n}");
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("end {n}");
}

Playground #1

And this is what it looks like foo in the format of a regular, non-asynchronous function. Below is the code for the entire function, after which we will break it into parts and analyze each of them. This is a complete replacement of the previous function, function main no change required. You can complete it by going to Playground.

fn foo(n: u64) -> Foo {
    let started = false;
    let duration = Duration::from_secs(1);
    let sleep = Box::pin(tokio::time::sleep(duration));
    Foo { n, started, sleep }
}

struct Foo {
    n: u64,
    started: bool,
    sleep: Pin<Box<tokio::time::Sleep>>,
}

impl Future for Foo {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
        if !self.started {
            println!("start {}", self.n);
            self.started = true;
        }
        if self.sleep.as_mut().poll(context).is_pending() {
            return Poll::Pending;
        }
        println!("end {}", self.n);
        Poll::Ready(())
    }
}

Playground #2

Let's go from top to bottom. Function foo is a regular function and returns a structure Foo. She calls tokio::time::sleepbut does not apply .await to the future Sleepreturned sleep. Instead, the future is stored in the structure Foo. We'll talk about Box::pin And Pin> a little further.

Foo becomes future due to the implementation of the trait Future. Below is the trait code from the standard library:

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

The trait itself is a couple of lines of code, but contains 3 new data types: Pin, Context And Poll. We will focus on Pollso first a few words about Context And Pinafter which we’ll leave them for later.

Every call Future::poll receives Context from the calling function. When one function poll calls another, for example when Foo::poll causes Sleep::pollshe passes it to her Context. For now, that's all we need to know until we get to the Wake section below.

Pin This is a wrapper used to wrap pointers. For now, if you allow me, I will pretend that Pin does nothing. I'll pretend that Box::pin it's simple Box::new, Pin<Box<T>> This Box<T>, Pin<&mut T> This &mut TA Pin<Box<T>>::as_mut it's simple Box::as_mut. Pin actually solves a very important problem in asynchronous Rust, but it will make more sense once we practice writing futures. We'll come back to this in the Pin section.

So let's focus on Poll. This enum and it looks like this:

pub enum Poll<T> {
    Ready(T),
    Pending,
}

First function task poll – return or Poll::Readyor Poll::Pending. Return Ready means the future has finished running and includes a value Outputif it exists. In that case poll will not be called again. Return Pending means that the future has not completed its work and poll will be called again.

You might have a question: and when will it be called again? In short, you need to prepare for any situation. The poll function can be called over and over again in a “loaded loop” and we need it to work correctly. The long answer to this question will be in the Wake section.

Let's take a look at the implementation of the trait Future at Foo and function poll. Code:

impl Future for Foo {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
        if !self.started {
            println!("start {}", self.n);
            self.started = true;
        }
        if self.sleep.as_mut().poll(context).is_pending() {
            return Poll::Pending;
        }
        println!("end {}", self.n);
        Poll::Ready(())
    }
}

Playground #2

We have seen that the first task poll there was a return Ready or Pending and now we can see that poll There is a second task, namely the future task itself. Task Foo — displaying several messages on the screen and going to sleep, which is what happens in the function poll.

There is an important trade-off here: poll should do whatever can be done quickly, but should not force the calling function to wait for a response. She must return it immediately either Readyor Pending. This compromise allows you to process more than one future at a time. This allows them to work without blocking each other.

To follow this rule, Foo::poll have to rely on the fact that Sleep::poll will quickly return the answer. If we add time tracking and conclusion, we can track what is happening. IN “important mistake“, which we did in the introduction, thread::sleep violated this rule, which led to the sequential execution of our futures. If we let's do the same in Foo::pollwe will see the same result. Using blocking sleep in poll causes the calling function to wait for a response, blocking all other futures.

Foo uses a flag startedto display the start message only once, regardless of how many times the function is called poll. Flag ended at the same time is not required, because poll will not be called again after returning Ready. Flag started turns Foo into a state machine with two possible states. In general, an asynchronous function requires some kind of initial state, as well as another state for each of the points .awaitto function poll could determine from what point to “continue execution.” If we had more than two states, we could use enum instead of bool. When writing async fn the compiler does this for us and such convenience is the main reason for having async/await as features of the language.

Join

Now that we've figured out how to implement a simple future, let's take a look at join_all. It may seem that join_all uses some kind of magic behind the scenes compared to foobut in fact it turns out that we have everything we need to implement it. Here join_all as a regular non-asynchronous function:

fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
    JoinAll {
        futures: futures.into_iter().map(Box::pin).collect(),
    }
}

struct JoinAll<F> {
    futures: Vec<Pin<Box<F>>>,
}

impl<F: Future> Future for JoinAll<F> {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
        let is_pending = |future: &mut Pin<Box<F>>| {
            future.as_mut().poll(context).is_pending()
        };
        self.futures.retain_mut(is_pending);
        if self.futures.is_empty() {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

Playground #3

Again, ours is not asynchronous join_all returns a structure that implements the trait Futurewhere the function operates in Future::poll. Here again there is Box::pinbut let's continue to ignore it.

Inside poll does all the hard work Vec::retain_mut. This is the usual method Vecwhich takes a closure as an argument, calls it for each element, and strips the elements returning false. This removes the future returned Readyfollowing the “don't call them again” rule.

There's nothing new here. From the outside, the launch of all futures at the same time looked like magic, but in fact, everything that happens inside is a challenge poll for all elements Vec. This is the flip side of the trade-off mentioned earlier. If we can trust that each call to poll will return a response quickly, we can process many futures in one loop.

Note that there is a shortcut here – we ignore the return value of the called futures. In this case it works because we use join_all only with foowhich does not return anything. Real join_all returns Vec<F::Output>which requires more steps to track state. Let's leave this as an exercise for the reader, as they say

Sleep

We are on the right track! It seems we already have everything to realize our own sleep:

fn sleep(duration: Duration) -> Sleep {
    let wake_time = Instant::now() + duration;
    Sleep { wake_time }
}

struct Sleep {
    wake_time: Instant,
}

impl Future for Sleep {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> {
        if Instant::now() >= self.wake_time {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

Playground #4

Hm. The code compiles without errors, and the logic is in the function poll looks correct, but when started it displays a “start” message and then freezes. If you add additional output statementsyou can see that everyone Sleep polled once at the beginning and never again. What are we missing?

It turns out that poll three tasksand so far we have only dealt with two. Firstly, poll does all the possible work without blocking. Then, poll returns Readyif it is completed, or Pendingif not. But finally, every time poll going to return Pendinghe needs to “schedule to wake up.” Ah, that's what we forgot.

The reason we haven't encountered this before is because Foo And JoinAll return Pending only when the other “future” has already returned to them Pendingwhich means the awakening is already scheduled. But Sleep – this is what we call “leaf future”. He has no other futures lower in the hierarchy, and he needs to awaken himself.

Wake

It's time to take a closer look Context. If we call context.waker()he will return Waker. Calling any of the methods waker.wake() or waker.wake_by_ref() the function may ask to poll itself again. These two methods do the same thing, and we will use the one that is more convenient.

The simplest thing we can try is to request a re-survey upon return Pending every time we return Pending:

fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
    if Instant::now() >= self.wake_time {
        Poll::Ready(())
    } else {
        context.waker().wake_by_ref();
        Poll::Pending
    }
}

Playground #5

This code outputs the correct result and exits at the correct time, so the “infinite sleep” problem is solved, but we replace it with the “busy loop” problem. This code calls poll over and over again as fast as it can, while burning through 100% of the CPU until it's finished. We can see it indirectly, counting the number of poll calls or we can measure directly using instruments, for example perf in Linux.

We need to call Waker later when it's time to wake up, but we can't use thread::sleep V poll. One possible solution is to start another thread that will do thread::sleep for us and then call wake. If we did this in every pol calll, we would run into the problem of too many threads discussed in the introduction. But we could work around it by starting a shared stream and using a channel to pass Wakers into it. This is indeed a viable implementation, but there are some things wrong with it. The main thread of our program already spends most of its time in a sleep state. Why do we need two sleeping threads? Why is there no way to wake up our main thread at a specific time?

Well, to be fair, there is such a way – that’s what it exists for. tokio::time::sleep. But if we really want to write our own sleep and don't want to create an additional thread for it, then we also need to write our own main.

Main

To call poll from main we will need Context to transfer to it. We can create it using Context::from_wakerbut for this we need Waker. There are several ways to create one, but for now we just need a stub, so we use a helper function called noop_waker (“Noop,” “no‑op,” and “nop” are abbreviations for “no operation”). Once we create Contextwe will be able to call poll in a loop:

fn main() {
    let mut futures = Vec::new();
    for n in 1..=10 {
        futures.push(foo(n));
    }
    let mut joined_future = Box::pin(future::join_all(futures));
    let waker = futures::task::noop_waker();
    let mut context = Context::from_waker(&waker);
    while joined_future.as_mut().poll(&mut context).is_pending() {
        // Busy loop!
    }
}

Playground #6

Works! Although we still have the “busy loop” problem as above. But before we solve it, we need to make one more important mistake:

Because this version of our main loop never stops polling, and because our Waker does nothing, the question may arise: is it necessary to call Waker V Sleep::poll? Surprisingly, this is really necessary. If you remove this line, the code will work fine at first. But if you increase the number of tasks from ten to one hundred, our “futures” will never “wake up”. We see that although our Waker does nothing, there are others in the program Waker. When futures::future::JoinAll has many child “futures” (in futures v0.3.30 the exact threshold is 31), it creates its own Waker in order to avoid repeatedly polling children that did not request to be “wake up”. This is more efficient than polling them all every time, but also means that children that never call their own Wakerwill no longer be polled. This is the reason why “future”, being in a state Pendingshould always arrange to call his Waker.

Okay, back to main. Let's fix the “busy loop” problem. We need to main used thread::sleep until the next wake up time, which means we need a way to Sleep::poll conveyed Waker and wake up time main. We'll use a global variable and wrap it in Mutexso that secure code can modify it.

static WAKE_TIMES: Mutex<BTreeMap<Instant, Vec<Waker>>> =
    Mutex::new(BTreeMap::new());

Playground #7

It's sorted map from time of awakening to Waker. Note that the value type here is Vec<Waker>not just Wakerbecause for this time Instant maybe several Waker. This is unlikely on Linux and macOS, where the resolution Instant::now() measured in nanoseconds, but on Windows the resolution is 15.6 milliseconds.

Sleep::poll can insert his Waker this mapusing BTreeMap::entry:

fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
    if Instant::now() >= self.wake_time {
        Poll::Ready(())
    } else {
        let mut wake_times = WAKE_TIMES.lock().unwrap();
        let wakers_vec = wake_times.entry(self.wake_time).or_default();
        wakers_vec.push(context.waker().clone());
        Poll::Pending
    }
}

Playground #7

After polling, our main loop can read the first key from mapto get the nearest wake-up time. He can then execute thread::sleep before this time, eliminating the “busy loop” problem. Next he calls everything Wakerwhose wake-up time has arrived, before re-running the loop and polling again:

fn main() {
    let mut futures = Vec::new();
    for n in 1..=10 {
        futures.push(foo(n));
    }
    let mut joined_future = Box::pin(future::join_all(futures));
    let waker = futures::task::noop_waker();
    let mut context = Context::from_waker(&waker);
    while joined_future.as_mut().poll(&mut context).is_pending() {
        // The joined future is Pending. Sleep until the next wake time.
        let mut wake_times = WAKE_TIMES.lock().unwrap();
        let next_wake = wake_times.keys().next().expect("sleep forever?");
        thread::sleep(next_wake.saturating_duration_since(Instant::now()));
        // We just woke up. Invoke all the Wakers whose time has come.
        while let Some(entry) = wake_times.first_entry() {
            if *entry.key() <= Instant::now() {
                entry.remove().into_iter().for_each(Waker::wake);
            } else {
                break;
            }
        }
        // Loop and poll again.
    }
}

Playground #7

Works! We solved the “busy loop” problem and did not need to create additional threads. This is what you need to write your own sleep.

This is a kind of completion of the first part. In Part 2, we'll extend our main loop to implement “tasks”. Now that we understand how “futures” work, we have the opportunity to cover a few additional topics that will be useful for the following parts, although they are not necessary for understanding them.

Bonus: Pin

Now that we know how to convert async fn into the structure Futurewe can tell you a little more about Pin and the problem it solves. Let's imagine that our async fn foo for some reason it uses a link inside:

async fn foo(n: u64) {
    let n_ref = &n;
    println!("start {n_ref}");
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("end {n_ref}");
}

Playground #8

This code compiles and runs fine, and looks like pretty normal Rust code. But what would the same change look like in our future structure? Foo?

struct Foo {
    n: u64,
    n_ref: &u64,
    started: bool,
    sleep: Pin<Box<tokio::time::Sleep>>,
}

Playground #9

The code doesn't compile:

error[E0106]: missing lifetime specifier
 --> src/main.rs:3:12
  |
3 |     n_ref: &u64,
  |            ^ expected named lifetime parameter

What should life be like? n_ref? The short answer: there is no good answer. Self-referential borrowing is generally not allowed in Rust structures, and for what it's trying to do n_refno syntax. If there were, we would have to answer some difficult questions about when we can change n and when can we move Foo.

But then, what type Future generated a compiler for async fn foo higher? It turns out that Rust does some unsafe things inside to eliminate unexpressed lifetimes, as in the case of n_ref. Pointer wrapper task Pin is to encapsulate this insecurity so that we can write custom futures like JoinAllin safe code. Structure Pin works with auto trait Unpinwhich is implemented for most concrete types, but not for compiler-generated futures returned async functions. Operations that can allow us to move fixed objects are either limited Unpin (How DerefMut), or marked as unsafe (For example get_unchecked_mut). It turns out that our widespread use Box::pin in the examples above meant that all our futures were automatically UnpinSo DerefMut worked for our links Pin<&mut Self>and we could change fields like self.started And self.futureswithout thinking about it.

This is where we will end our discussion of Pin, since detailed details are not needed for tasks (part two) or I/O (part three). But if you want to know all the details, start with this post from the author Pinand then read official Pin documentation.

Bonus: Cancel

Asynchronous functions look and feel like regular functions, but with special superpowers. There is one more superpower that we haven't mentioned.

This superpower is cancellation. When we call a regular function in blocking code, we don't have any way to cancel the call after a while. But we can cancel any future simply… without calling it again. For this purpose in tokio There is tokio::time::timeoutand we also have everything you need to make your own version:

struct Timeout<F> {
    sleep: Pin<Box<tokio::time::Sleep>>,
    inner: Pin<Box<F>>,
}

impl<F: Future> Future for Timeout<F> {
    type Output = Option<F::Output>;

    fn poll(
        mut self: Pin<&mut Self>,
        context: &mut Context,
    ) -> Poll<Self::Output> {
        // Check whether the inner future is finished.
        if let Poll::Ready(output) = self.inner.as_mut().poll(context) {
            return Poll::Ready(Some(output));
        }
        // Check whether time is up.
        if self.sleep.as_mut().poll(context).is_ready() {
            return Poll::Ready(None);
        }
        // Still waiting.
        Poll::Pending
    }
}

fn timeout<F: Future>(duration: Duration, inner: F) -> Timeout<F> {
    Timeout {
        sleep: Box::pin(tokio::time::sleep(duration)),
        inner: Box::pin(inner),
    }
}

Playground #10

This wrapper works with any asynchronous function. Regular functions do not have such functionality.

Bonus: Recursion

The only superpower we're missing is recursion. When trying to call itself in an asynchronous function:

async fn factorial(n: u64) -> u64 {
    if n == 0 {
        1
    } else {
        n * factorial(n - 1).await
    }
}

Playground #11

We will get a compilation error:

error[E0733]: recursion in an async fn requires boxing
 --> recursion.rs:1:1
  |
1 | async fn factorial(n: u64) -> u64 {
  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
…
5 |         n * factorial(n - 1).await
  |             ---------------------- recursive call here
  |
  = note: a recursive `async fn` must introduce indirection such as `Box::pin` to avoid an infinitely sized future

When regular functions call each other, they dynamically allocate memory on the call stack. But when asynchronous functions do .await each other, they are compiled into structures containing other structures whose dimensions are static. If an asynchronous function calls itself, it has to put a recurrent future in Box before the call .await:

async fn factorial(n: u64) -> u64 {
    if n == 0 {
        1
    } else {
        let recurse = Box::pin(factorial(n - 1));
        n * recurse.await
    }
}

This works, but requires memory allocation on the heap.

So, let's finish this and move on to the “tasks”.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *