Asynchronous Rust in three parts. Part Three: IO

Asynchronous Rust in three parts

Of course, async/await was not invented for sleep. Our focus from the beginning has been input/output (I/O), and network I/O in particular. Armed with futures and tasks, we can now move on to real-life examples.

Let's go back to regular, non-asynchronous Rust for a minute. Let's start with a simple server and a client that interacts with it. Using threads, we will combine the server and several clients into one example that can be run in Playground. When this combination works, we will switch it to async. using the main loop we wrote in part 2.

Here is our simple server:

fn main() -> io::Result<()> {
    let listener = TcpListener::bind("0.0.0.0:8000")?;
    let mut n = 1;
    loop {
        let (mut socket, _) = listener.accept()?;
        let start_msg = format!("start {n}\n");
        socket.write_all(start_msg.as_bytes())?;
        thread::sleep(Duration::from_secs(1));
        let end_msg = format!("end {n}\n");
        socket.write_all(end_msg.as_bytes())?;
        n += 1;
    }
}

Playground #1

The server starts listening on the port 8000. For each connection, it records a start message, pauses for one second, and records an end message.

Below is the client for our simple server:

fn main() -> io::Result<()> {
    let mut socket = TcpStream::connect("localhost:8000")?;
    io::copy(&mut socket, &mut io::stdout())?;
    Ok(())
}

Playground #2

The client opens a connection to the server and copies all received bytes to standard output as they arrive. It doesn't explicitly pause, but it still takes a second to execute because the server completes the response after a second. Under the hood: io::copy is a convenient wrapper around standard methods Read::read And Write::writeAnd read blocked until data arrives.

These programs will not be able to interact with each other on the Playground. You might want to run them on your computer, or better yet, on two different devices on your WiFi network. If you haven't done this before, it can be quite interesting to see how they work on a real network. It may also be helpful to review the web server project from chapter 20 of the book.

Streams

Let's run the code on Playground, combining the client and server in one program. Since both processes are blocking, we will need to run them in separate threads. We will rename their functions main V client_main And server_mainand at the same time we will launch ten clients simultaneously:

fn main() -> io::Result<()> {
    // Avoid a race between bind and connect by binding before spawn.
    let listener = TcpListener::bind("0.0.0.0:8000")?;
    // Start the server on a background thread.
    thread::spawn(|| server_main(listener).unwrap());
    // Run ten clients on ten different threads.
    let mut client_handles = Vec::new();
    for _ in 1..=10 {
        client_handles.push(thread::spawn(client_main));
    }
    for handle in client_handles {
        handle.join().unwrap()?;
    }
    Ok(())
}

Playground #3

The code works on Playground, but takes ten seconds. Although the clients run in parallel, the server processes them one at a time. Let's have the server create a new thread for every incoming request:

fn one_response(mut socket: TcpStream, n: u64) -> io::Result<()> {
    let start_msg = format!("start {n}\n");
    socket.write_all(start_msg.as_bytes())?;
    thread::sleep(Duration::from_secs(1));
    let end_msg = format!("end {n}\n");
    socket.write_all(end_msg.as_bytes())?;
    Ok(())
}

fn server_main(listener: TcpListener) -> io::Result<()> {
    let mut n = 1;
    loop {
        let (socket, _) = listener.accept()?;
        thread::spawn(move || one_response(socket, n).unwrap());
        n += 1;
    }
}

Playground #4

The code still works and now runs in just a second. This is exactly what we wanted. Now we're ready for our final project: the expansion main cycle from the second part and converting this example to asynchronous mode.

We face two main tasks. First, we need I/O functions that return control immediately without blocking, even if the input data has not yet arrived, so that we can use them in Future::poll. Second, when all of our tasks are waiting for input, we need to put the program to sleep instead of actively waiting, and we need a way to wake up when some input arrives.

Non-blocking

The standard library has a solution to the first problem. Types TcpListener And TcpStream there are methods set_nonblockingwhich make it possible accept, read And write return ErrorKind::WouldBlock instead of blocking.

Technically, by itself set_nonblocking sufficient for working with asynchronous I/O. Without solving the second problem, we will be using 100% of the CPU in a “busy loop” until the job completes, but our output will still be correct and we can do a little prep work before moving on to the more complex part.

When we wrote Foo, JoinAll And Sleep in the first part, each of them required defining the structure, function poll and constructor functions. To reduce the amount of boilerplate code this time, we use std::future::poll_fnwhich takes a self-written function poll and generates the rest of the future.

There are four potentially blocking operations that we need to make asynchronous. This accept And write on the server side as well connect And read on the client side. Let's start with accept:

async fn accept(
    listener: &mut TcpListener,
) -> io::Result<(TcpStream, SocketAddr)> {
    std::future::poll_fn(|context| match listener.accept() {
        Ok((stream, addr)) => {
            stream.set_nonblocking(true)?;
            Poll::Ready(Ok((stream, addr)))
        }
        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
            // TODO: This is a busy loop.
            context.waker().wake_by_ref();
            Poll::Pending
        }
        Err(e) => Poll::Ready(Err(e)),
    }).await
}

Playground #5

The key here is error handling WouldBlockconverting them to Pending. Call wake_by_ref whenever we return Pendinglike us done in the second version of Sleep from the first partresults in a “busy loop”. We'll fix this in the next section. We assume that TcpListener is already in non-blocking mode and we also translate the returned TcpStream to non-blocking mode to prepare for asynchronous writes.

Now let's implement the recording. If we wanted to copy Tokiowe would have to implement the trait AsyncWrite and make everything generic, but that would require a lot of code. Instead, let's keep the code small and hardcode what we write in TcpStream:

async fn write_all(
    mut buf: &[u8],
    stream: &mut TcpStream,
) -> io::Result<()> {
    std::future::poll_fn(|context| {
        while !buf.is_empty() {
            match stream.write(&buf) {
                Ok(0) => {
                    let e = io::Error::from(io::ErrorKind::WriteZero);
                    return Poll::Ready(Err(e));
                }
                Ok(n) => buf = &buf[n..],
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // TODO: This is a busy loop.
                    context.waker().wake_by_ref();
                    return Poll::Pending;
                }
                Err(e) => return Poll::Ready(Err(e)),
            }
        }
        Poll::Ready(Ok(()))
    }).await
}

Playground #5

TcpStream::write does not guarantee processing of everything bufso we need to call it in a loop, shifting buf forward every time. It's unlikely that we'll see Ok(0) from TcpStreambut if it does, it's better that it's an error than an infinite loop. The loop condition also means that we won't call writeIf buf is initially empty, which is the default behavior for Write::write_all.

Now we can write an asynchronous version server_main:

async fn one_response(mut socket: TcpStream, n: u64) -> io::Result<()> {
    let start_msg = format!("start {n}\n");
    write_all(start_msg.as_bytes(), &mut socket).await?;
    sleep(Duration::from_secs(1)).await;
    let end_msg = format!("end {n}\n");
    write_all(end_msg.as_bytes(), &mut socket).await?;
    Ok(())
}

async fn server_main(mut listener: TcpListener) -> io::Result<()> {
    let mut n = 1;
    loop {
        let (socket, _) = accept(&mut listener).await?;
        spawn(async move { one_response(socket, n).await.unwrap() });
        n += 1;
    }
}

Playground #5

Similar to the thread example at the beginning, we never merge server tasks, so we use unwrapto display error information in stderrif they happen. Previously we did this inside a closure, but here we did it inside an asynchronous block, which acts as an anonymous asynchronous function with no arguments.

Hopefully this will work, but we need to rewrite the client before we can test it.

We just implemented asynchronous writing, so let's do asynchronous reading. The opposite Write::write_all is Read::read_to_endbut this is not exactly what we need. We want to output data immediately after receiving it, rather than collecting it in Vec and output everything at the end. Let's simplify the problem again and hardcode the output. Let's call it print_all:

async fn print_all(stream: &mut TcpStream) -> io::Result<()> {
    std::future::poll_fn(|context| {
        loop {
            let mut buf = [0; 1024];
            match stream.read(&mut buf) {
                Ok(0) => return Poll::Ready(Ok(())), // EOF
                // Assume that printing doesn't block.
                Ok(n) => io::stdout().write_all(&buf[..n])?,
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // TODO: This is a busy loop.
                    context.waker().wake_by_ref();
                    return Poll::Pending;
                }
                Err(e) => return Poll::Ready(Err(e)),
            }
        }
    }).await
}

Playground #5

Ok(0) when read means end of file, but otherwise the implementation is similar to write_alldescribed above.

The other asynchronous part we need for our client is connectbut there are several problems with this. Firstly, TcpStream::connect creates a new thread and we have no way to call set_nonblocking for this thread before connect will begin to interact with the network. Secondly, connect may involve DNS lookups, and asynchronous DNS is a real hassle. Solving these problems here would take a lot of effort without much benefit… so we're going to cheat and just assume that connect does not block.

With one true asynchronous part and one outright lie we can write client_main:

async fn client_main() -> io::Result<()> {
    // XXX: Assume that connect() returns quickly.
    let mut socket = TcpStream::connect("localhost:8000")?;
    socket.set_nonblocking(true)?;
    print_all(&mut socket).await?;
    Ok(())
}

Playground #5

And finally async_main:

async fn async_main() -> io::Result<()> {
    // Avoid a race between bind and connect by binding before spawn.
    let listener = TcpListener::bind("0.0.0.0:8000")?;
    listener.set_nonblocking(true)?;
    // Start the server on a background task.
    spawn(async { server_main(listener).await.unwrap() });
    // Run ten clients as ten different tasks.
    let mut task_handles = Vec::new();
    for _ in 1..=10 {
        task_handles.push(spawn(client_main()));
    }
    for handle in task_handles {
        handle.await?;
    }
    Ok(())
}

Playground #5

Works! It constantly runs in a loop and consumes 100% CPU, but it works.

Poll

The second big problem we need to solve is pausing the main loop until the input arrives. We can't do it ourselves and we need help from the operating system. For this we will use the system call pollwhich is available on all Unix-like operating systems, including Linux and macOS. We will call it using the C standard library function libc::pollwhich looks like this in Rust:

pub unsafe extern "C" fn poll(
    fds: *mut pollfd,
    nfds: nfds_t,
    timeout: c_int,
) -> c_int

libc::poll accepts a list of “file descriptors to poll” and a timeout in milliseconds. The timeout will allow us to wake up for sleep, in addition to I/O, replacing thread::sleep in our main loop. Every pollfd looks like this:

struct pollfd {
    fd: c_int,
    events: c_short,
    revents: c_short,
}

Field fd is a “file descriptor,” or in Rust terms, a “raw” file descriptor. This is an identifier that Unix-based operating systems use to track open resources such as files and sockets. We can get the handle from TcpListener or TcpStreamcalling .as_raw_fd()which returns RawFdnickname for c_int.

Field events is a collection of bit flags indicating that we are waiting. The most common events are POLLINwhich means input is available, and POLLOUTwhich means there is free space in the output buffers. We'll be waiting POLLINwhen we get it WouldBlock from reading, and we will wait POLLOUTwhen we get it WouldBlock from recording.

Field revents (“returned events”) is similar, but is used for output rather than input. After poll returns, the bits in this field indicate whether the corresponding handle was one of those that caused the wakeup. We could use this to poll only the specific tasks that have been woken up, but for simplicity we will ignore this field and poll each task every time we wake up.

Our asynchronous I/O functions, such as accept, write_all And print_allyou will need to send pollfds And Wakers back to mainto main could call libc::poll. We'll add a couple more global ones Vec for this, plus a helper function to fill them in:

static POLL_FDS: Mutex<Vec<libc::pollfd>> = Mutex::new(Vec::new());
static POLL_WAKERS: Mutex<Vec<Waker>> = Mutex::new(Vec::new());

fn register_pollfd(
    context: &mut Context,
    fd: &impl AsRawFd,
    events: libc::c_short,
) {
    let mut poll_fds = POLL_FDS.lock().unwrap();
    let mut poll_wakers = POLL_WAKERS.lock().unwrap();
    poll_fds.push(libc::pollfd {
        fd: fd.as_raw_fd(),
        events,
        revents: 0,
    });
    poll_wakers.push(context.waker().clone());
}

Playground #6

Now our async I/O functions can call register_pollfd instead of wake_by_ref. accept And print_all are read operations, so they process WouldBlocksetting POLLIN:

Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
    register_pollfd(context, listener, libc::POLLIN);
    Poll::Pending
}

Playground #6

write_all processes WouldBlocksetting POLLOUT:

Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
    register_pollfd(context, stream, libc::POLLOUT);
    return Poll::Pending;
}

Playground #6

Finally, main. Let's start by preparing a timeout argument for libc::poll. It's similar to how we were calculating the next wake-up time all along, except now we're not guaranteed to have it and we need to convert it to milliseconds:

// Some tasks might wake other tasks. Re-poll if the AwakeFlag has been
// set. Polling futures that aren't ready yet is inefficient but allowed.
if awake_flag.check_and_clear() {
    continue;
}
// All tasks are either sleeping or blocked on IO. Use libc::poll to wait
// for IO on any of the POLL_FDS. If there are any WAKE_TIMES, use the
// earliest as a timeout.
let mut wake_times = WAKE_TIMES.lock().unwrap();
let timeout_ms = if let Some(time) = wake_times.keys().next() {
    let duration = time.saturating_duration_since(Instant::now());
    duration.as_millis() as libc::c_int
} else {
    -1 // infinite timeout
};

Playground #6

After all this preparation we can replace thread::sleep on libc::poll in the main loop. This is an “external” function, so calling it is unsafe:

let mut poll_fds = POLL_FDS.lock().unwrap();
let mut poll_wakers = POLL_WAKERS.lock().unwrap();
let poll_error_code = unsafe {
    libc::poll(
        poll_fds.as_mut_ptr(),
        poll_fds.len() as libc::nfds_t,
        timeout_ms,
    )
};
if poll_error_code < 0 {
    return Err(io::Error::last_os_error());
}

Playground #6

And finally, when we wake up and libc::poll returns control, we need to clear POLL_FDS and call everything POLL_WAKERS. The main loop still checks all tasks every time, and tasks that are not in state Readyregister in POLL_FDS before the next sleep cycle.

poll_fds.clear();
poll_wakers.drain(..).for_each(Waker::wake);
// Invoke Wakers from WAKE_TIMES if their time has come.
while let Some(entry) = wake_times.first_entry() {
    …

Playground #6

Works!

That's it. We did it. Our main loop has finally become an event loop.

I hope this little adventure has helped you make async Rust, and async I/O in general, a little less mysterious. Good luck!:)

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *