Asynchronous Rust in three parts. Part Three: IO
Asynchronous Rust in three parts
Of course, async/await was not invented for sleep. Our focus from the beginning has been input/output (I/O), and network I/O in particular. Armed with futures and tasks, we can now move on to real-life examples.
Let's go back to regular, non-asynchronous Rust for a minute. Let's start with a simple server and a client that interacts with it. Using threads, we will combine the server and several clients into one example that can be run in Playground. When this combination works, we will switch it to async. using the main loop we wrote in part 2.
Here is our simple server:
fn main() -> io::Result<()> {
let listener = TcpListener::bind("0.0.0.0:8000")?;
let mut n = 1;
loop {
let (mut socket, _) = listener.accept()?;
let start_msg = format!("start {n}\n");
socket.write_all(start_msg.as_bytes())?;
thread::sleep(Duration::from_secs(1));
let end_msg = format!("end {n}\n");
socket.write_all(end_msg.as_bytes())?;
n += 1;
}
}
The server starts listening on the port 8000
. For each connection, it records a start message, pauses for one second, and records an end message.
Below is the client for our simple server:
fn main() -> io::Result<()> {
let mut socket = TcpStream::connect("localhost:8000")?;
io::copy(&mut socket, &mut io::stdout())?;
Ok(())
}
The client opens a connection to the server and copies all received bytes to standard output as they arrive. It doesn't explicitly pause, but it still takes a second to execute because the server completes the response after a second. Under the hood: io::copy is a convenient wrapper around standard methods Read::read And Write::writeAnd read
blocked until data arrives.
These programs will not be able to interact with each other on the Playground. You might want to run them on your computer, or better yet, on two different devices on your WiFi network. If you haven't done this before, it can be quite interesting to see how they work on a real network. It may also be helpful to review the web server project from chapter 20 of the book.
Streams
Let's run the code on Playground, combining the client and server in one program. Since both processes are blocking, we will need to run them in separate threads. We will rename their functions main
V client_main
And server_main
and at the same time we will launch ten clients simultaneously:
fn main() -> io::Result<()> {
// Avoid a race between bind and connect by binding before spawn.
let listener = TcpListener::bind("0.0.0.0:8000")?;
// Start the server on a background thread.
thread::spawn(|| server_main(listener).unwrap());
// Run ten clients on ten different threads.
let mut client_handles = Vec::new();
for _ in 1..=10 {
client_handles.push(thread::spawn(client_main));
}
for handle in client_handles {
handle.join().unwrap()?;
}
Ok(())
}
The code works on Playground, but takes ten seconds. Although the clients run in parallel, the server processes them one at a time. Let's have the server create a new thread for every incoming request:
fn one_response(mut socket: TcpStream, n: u64) -> io::Result<()> {
let start_msg = format!("start {n}\n");
socket.write_all(start_msg.as_bytes())?;
thread::sleep(Duration::from_secs(1));
let end_msg = format!("end {n}\n");
socket.write_all(end_msg.as_bytes())?;
Ok(())
}
fn server_main(listener: TcpListener) -> io::Result<()> {
let mut n = 1;
loop {
let (socket, _) = listener.accept()?;
thread::spawn(move || one_response(socket, n).unwrap());
n += 1;
}
}
The code still works and now runs in just a second. This is exactly what we wanted. Now we're ready for our final project: the expansion main cycle from the second part and converting this example to asynchronous mode.
We face two main tasks. First, we need I/O functions that return control immediately without blocking, even if the input data has not yet arrived, so that we can use them in Future::poll
. Second, when all of our tasks are waiting for input, we need to put the program to sleep instead of actively waiting, and we need a way to wake up when some input arrives.
Non-blocking
The standard library has a solution to the first problem. Types TcpListener And TcpStream there are methods set_nonblockingwhich make it possible accept
, read
And write
return ErrorKind::WouldBlock instead of blocking.
Technically, by itself set_nonblocking
sufficient for working with asynchronous I/O. Without solving the second problem, we will be using 100% of the CPU in a “busy loop” until the job completes, but our output will still be correct and we can do a little prep work before moving on to the more complex part.
When we wrote Foo
, JoinAll
And Sleep
in the first part, each of them required defining the structure, function poll
and constructor functions. To reduce the amount of boilerplate code this time, we use std::future::poll_fnwhich takes a self-written function poll
and generates the rest of the future.
There are four potentially blocking operations that we need to make asynchronous. This accept
And write
on the server side as well connect
And read
on the client side. Let's start with accept
:
async fn accept(
listener: &mut TcpListener,
) -> io::Result<(TcpStream, SocketAddr)> {
std::future::poll_fn(|context| match listener.accept() {
Ok((stream, addr)) => {
stream.set_nonblocking(true)?;
Poll::Ready(Ok((stream, addr)))
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// TODO: This is a busy loop.
context.waker().wake_by_ref();
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}).await
}
The key here is error handling WouldBlock
converting them to Pending
. Call wake_by_ref
whenever we return Pending
like us done in the second version of Sleep from the first partresults in a “busy loop”. We'll fix this in the next section. We assume that TcpListener
is already in non-blocking mode and we also translate the returned TcpStream
to non-blocking mode to prepare for asynchronous writes.
Now let's implement the recording. If we wanted to copy Tokio
we would have to implement the trait AsyncWrite
and make everything generic, but that would require a lot of code. Instead, let's keep the code small and hardcode what we write in TcpStream
:
async fn write_all(
mut buf: &[u8],
stream: &mut TcpStream,
) -> io::Result<()> {
std::future::poll_fn(|context| {
while !buf.is_empty() {
match stream.write(&buf) {
Ok(0) => {
let e = io::Error::from(io::ErrorKind::WriteZero);
return Poll::Ready(Err(e));
}
Ok(n) => buf = &buf[n..],
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// TODO: This is a busy loop.
context.waker().wake_by_ref();
return Poll::Pending;
}
Err(e) => return Poll::Ready(Err(e)),
}
}
Poll::Ready(Ok(()))
}).await
}
TcpStream::write
does not guarantee processing of everything buf
so we need to call it in a loop, shifting buf
forward every time. It's unlikely that we'll see Ok(0)
from TcpStream
but if it does, it's better that it's an error than an infinite loop. The loop condition also means that we won't call write
If buf
is initially empty, which is the default behavior for Write::write_all.
Now we can write an asynchronous version server_main
:
async fn one_response(mut socket: TcpStream, n: u64) -> io::Result<()> {
let start_msg = format!("start {n}\n");
write_all(start_msg.as_bytes(), &mut socket).await?;
sleep(Duration::from_secs(1)).await;
let end_msg = format!("end {n}\n");
write_all(end_msg.as_bytes(), &mut socket).await?;
Ok(())
}
async fn server_main(mut listener: TcpListener) -> io::Result<()> {
let mut n = 1;
loop {
let (socket, _) = accept(&mut listener).await?;
spawn(async move { one_response(socket, n).await.unwrap() });
n += 1;
}
}
Similar to the thread example at the beginning, we never merge server tasks, so we use unwrap
to display error information in stderr
if they happen. Previously we did this inside a closure, but here we did it inside an asynchronous block, which acts as an anonymous asynchronous function with no arguments.
Hopefully this will work, but we need to rewrite the client before we can test it.
We just implemented asynchronous writing, so let's do asynchronous reading. The opposite Write::write_all
is Read::read_to_end
but this is not exactly what we need. We want to output data immediately after receiving it, rather than collecting it in Vec
and output everything at the end. Let's simplify the problem again and hardcode the output. Let's call it print_all
:
async fn print_all(stream: &mut TcpStream) -> io::Result<()> {
std::future::poll_fn(|context| {
loop {
let mut buf = [0; 1024];
match stream.read(&mut buf) {
Ok(0) => return Poll::Ready(Ok(())), // EOF
// Assume that printing doesn't block.
Ok(n) => io::stdout().write_all(&buf[..n])?,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// TODO: This is a busy loop.
context.waker().wake_by_ref();
return Poll::Pending;
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}).await
}
Ok(0)
when read means end of file, but otherwise the implementation is similar to write_all
described above.
The other asynchronous part we need for our client is connect
but there are several problems with this. Firstly, TcpStream::connect
creates a new thread and we have no way to call set_nonblocking
for this thread before connect
will begin to interact with the network. Secondly, connect
may involve DNS lookups, and asynchronous DNS is a real hassle. Solving these problems here would take a lot of effort without much benefit… so we're going to cheat and just assume that connect
does not block.
With one true asynchronous part and one outright lie we can write client_main
:
async fn client_main() -> io::Result<()> {
// XXX: Assume that connect() returns quickly.
let mut socket = TcpStream::connect("localhost:8000")?;
socket.set_nonblocking(true)?;
print_all(&mut socket).await?;
Ok(())
}
And finally async_main
:
async fn async_main() -> io::Result<()> {
// Avoid a race between bind and connect by binding before spawn.
let listener = TcpListener::bind("0.0.0.0:8000")?;
listener.set_nonblocking(true)?;
// Start the server on a background task.
spawn(async { server_main(listener).await.unwrap() });
// Run ten clients as ten different tasks.
let mut task_handles = Vec::new();
for _ in 1..=10 {
task_handles.push(spawn(client_main()));
}
for handle in task_handles {
handle.await?;
}
Ok(())
}
Works! It constantly runs in a loop and consumes 100% CPU, but it works.
Poll
The second big problem we need to solve is pausing the main loop until the input arrives. We can't do it ourselves and we need help from the operating system. For this we will use the system call pollwhich is available on all Unix-like operating systems, including Linux and macOS. We will call it using the C standard library function libc::pollwhich looks like this in Rust:
pub unsafe extern "C" fn poll(
fds: *mut pollfd,
nfds: nfds_t,
timeout: c_int,
) -> c_int
libc::poll
accepts a list of “file descriptors to poll” and a timeout in milliseconds. The timeout will allow us to wake up for sleep, in addition to I/O, replacing thread::sleep
in our main loop. Every pollfd looks like this:
struct pollfd {
fd: c_int,
events: c_short,
revents: c_short,
}
Field fd
is a “file descriptor,” or in Rust terms, a “raw” file descriptor. This is an identifier that Unix-based operating systems use to track open resources such as files and sockets. We can get the handle from TcpListener
or TcpStream
calling .as_raw_fd()which returns RawFdnickname for c_int
.
Field events
is a collection of bit flags indicating that we are waiting. The most common events are POLLINwhich means input is available, and POLLOUTwhich means there is free space in the output buffers. We'll be waiting POLLIN
when we get it WouldBlock
from reading, and we will wait POLLOUT
when we get it WouldBlock
from recording.
Field revents
(“returned events”) is similar, but is used for output rather than input. After poll
returns, the bits in this field indicate whether the corresponding handle was one of those that caused the wakeup. We could use this to poll only the specific tasks that have been woken up, but for simplicity we will ignore this field and poll each task every time we wake up.
Our asynchronous I/O functions, such as accept
, write_all
And print_all
you will need to send pollfds
And Wakers
back to main
to main
could call libc::poll
. We'll add a couple more global ones Vec
for this, plus a helper function to fill them in:
static POLL_FDS: Mutex<Vec<libc::pollfd>> = Mutex::new(Vec::new());
static POLL_WAKERS: Mutex<Vec<Waker>> = Mutex::new(Vec::new());
fn register_pollfd(
context: &mut Context,
fd: &impl AsRawFd,
events: libc::c_short,
) {
let mut poll_fds = POLL_FDS.lock().unwrap();
let mut poll_wakers = POLL_WAKERS.lock().unwrap();
poll_fds.push(libc::pollfd {
fd: fd.as_raw_fd(),
events,
revents: 0,
});
poll_wakers.push(context.waker().clone());
}
Now our async I/O functions can call register_pollfd
instead of wake_by_ref
. accept
And print_all
are read operations, so they process WouldBlock
setting POLLIN
:
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
register_pollfd(context, listener, libc::POLLIN);
Poll::Pending
}
write_all
processes WouldBlock
setting POLLOUT
:
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
register_pollfd(context, stream, libc::POLLOUT);
return Poll::Pending;
}
Finally, main
. Let's start by preparing a timeout argument for libc::poll
. It's similar to how we were calculating the next wake-up time all along, except now we're not guaranteed to have it and we need to convert it to milliseconds:
// Some tasks might wake other tasks. Re-poll if the AwakeFlag has been
// set. Polling futures that aren't ready yet is inefficient but allowed.
if awake_flag.check_and_clear() {
continue;
}
// All tasks are either sleeping or blocked on IO. Use libc::poll to wait
// for IO on any of the POLL_FDS. If there are any WAKE_TIMES, use the
// earliest as a timeout.
let mut wake_times = WAKE_TIMES.lock().unwrap();
let timeout_ms = if let Some(time) = wake_times.keys().next() {
let duration = time.saturating_duration_since(Instant::now());
duration.as_millis() as libc::c_int
} else {
-1 // infinite timeout
};
After all this preparation we can replace thread::sleep
on libc::poll
in the main loop. This is an “external” function, so calling it is unsafe:
let mut poll_fds = POLL_FDS.lock().unwrap();
let mut poll_wakers = POLL_WAKERS.lock().unwrap();
let poll_error_code = unsafe {
libc::poll(
poll_fds.as_mut_ptr(),
poll_fds.len() as libc::nfds_t,
timeout_ms,
)
};
if poll_error_code < 0 {
return Err(io::Error::last_os_error());
}
And finally, when we wake up and libc::poll
returns control, we need to clear POLL_FDS
and call everything POLL_WAKERS
. The main loop still checks all tasks every time, and tasks that are not in state Ready
register in POLL_FDS
before the next sleep cycle.
poll_fds.clear();
poll_wakers.drain(..).for_each(Waker::wake);
// Invoke Wakers from WAKE_TIMES if their time has come.
while let Some(entry) = wake_times.first_entry() {
…
Works!
That's it. We did it. Our main loop has finally become an event loop.
I hope this little adventure has helped you make async Rust, and async I/O in general, a little less mysterious. Good luck!:)