Getting Started

Welcome to Asynchronous Programming in Rust! If you're looking to start writing asynchronous Rust code, you've come to the right place. Whether you're building a web server, a database, or an operating system, this book will show you how to use Rust's asynchronous programming tools to get the most out of your hardware.

What This Book Covers

This book aims to be a comprehensive, up-to-date guide to using Rust's async language features and libraries, appropriate for beginners and old hands alike.

  • The early chapters provide an introduction to async programming in general, and to Rust's particular take on it.

  • The middle chapters discuss key utilities and control-flow tools you can use when writing async code, and describe best-practices for structuring libraries and applications to maximize performance and reusability.

  • The last section of the book covers the broader async ecosystem, and provides a number of examples of how to accomplish common tasks.

With that out of the way, let's explore the exciting world of Asynchronous Programming in Rust!

Why Async?

We all love how Rust allows us to write fast, safe software. But why write asynchronous code?

Asynchronous code allows us to run multiple tasks concurrently on the same OS thread. In a typical threaded application, if you wanted to download two different webpages at the same time, you would spread the work across two different threads, like this:


#![allow(unused_variables)]
fn main() {
fn get_two_sites() {
    // Spawn two threads to do work.
    let thread_one = thread::spawn(|| download("https://www.foo.com"));
    let thread_two = thread::spawn(|| download("https://www.bar.com"));

    // Wait for both threads to complete.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}
}

This works fine for many applications-- after all, threads were designed to do just this: run multiple different tasks at once. However, they also come with some limitations. There's a lot of overhead involved in the process of switching between different threads and sharing data between threads. Even a thread which just sits and does nothing uses up valuable system resources. These are the costs that asynchronous code is designed to eliminate. We can rewrite the function above using Rust's async/.await notation, which will allow us to run multiple tasks at once without creating multiple threads:


#![allow(unused_variables)]
fn main() {
async fn get_two_sites_async() {
    // Create two different "futures" which, when run to completion,
    // will asynchronously download the webpages.
    let future_one = download_async("https://www.foo.com");
    let future_two = download_async("https://www.bar.com");

    // Run both futures to completion at the same time.
    join!(future_one, future_two);
}
}

Overall, asynchronous applications have the potential to be much faster and use fewer resources than a corresponding threaded implementation. However, there is a cost. Threads are natively supported by the operating system, and using them doesn't require any special programming model-- any function can create a thread, and calling a function that uses threads is usually just as easy as calling any normal function. However, asynchronous functions require special support from the language or libraries. In Rust, async fn creates an asynchronous function which returns a Future. To execute the body of the function, the returned Future must be run to completion.

It's important to remember that traditional threaded applications can be quite effective, and that Rust's small memory footprint and predictability mean that you can get far without ever using async. The increased complexity of the asynchronous programming model isn't always worth it, and it's important to consider whether your application would be better served by using a simpler threaded model.

The State of Asynchronous Rust

The asynchronous Rust ecosystem has undergone a lot of evolution over time, so it can be hard to know what tools to use, what libraries to invest in, or what documentation to read. However, the Future trait inside the standard library and the async/await language feature has recently been stabilized. The ecosystem as a whole is therefore in the midst of migrating to the newly-stabilized API, after which point churn will be significantly reduced.

At the moment, however, the ecosystem is still undergoing rapid development and the asynchronous Rust experience is unpolished. Most libraries still use the 0.1 definitions of the futures crate, meaning that to interoperate developers frequently need to reach for the compat functionality from the 0.3 futures crate. The async/await language feature is still new. Important extensions like async fn syntax in trait methods are still unimplemented, and the current compiler error messages can be difficult to parse.

That said, Rust is well on its way to having some of the most performant and ergonomic support for asynchronous programming around, and if you're not afraid of doing some spelunking, enjoy your dive into the world of asynchronous programming in Rust!

async/.await Primer

async/.await is Rust's built-in tool for writing asynchronous functions that look like synchronous code. async transforms a block of code into a state machine that implements a trait called Future. Whereas calling a blocking function in a synchronous method would block the whole thread, blocked Futures will yield control of the thread, allowing other Futures to run.

To create an asynchronous function, you can use the async fn syntax:


#![allow(unused_variables)]
fn main() {
async fn do_something() { ... }
}

The value returned by async fn is a Future. For anything to happen, the Future needs to be run on an executor.

// `block_on` blocks the current thread until the provided future has run to
// completion. Other executors provide more complex behavior, like scheduling
// multiple futures onto the same thread.
use futures::executor::block_on;

async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // Nothing is printed
    block_on(future); // `future` is run and "hello, world!" is printed
}

Inside an async fn, you can use .await to wait for the completion of another type that implements the Future trait, such as the output of another async fn. Unlike block_on, .await doesn't block the current thread, but instead asynchronously waits for the future to complete, allowing other tasks to run if the future is currently unable to make progress.

For example, imagine that we have three async fn: learn_song, sing_song, and dance:


#![allow(unused_variables)]
fn main() {
async fn learn_song() -> Song { ... }
async fn sing_song(song: Song) { ... }
async fn dance() { ... }
}

One way to do learn, sing, and dance would be to block on each of these individually:

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

However, we're not giving the best performance possible this way-- we're only ever doing one thing at once! Clearly we have to learn the song before we can sing it, but it's possible to dance at the same time as learning and singing the song. To do this, we can create two separate async fn which can be run concurrently:

async fn learn_and_sing() {
    // Wait until the song has been learned before singing it.
    // We use `.await` here rather than `block_on` to prevent blocking the
    // thread, which makes it possible to `dance` at the same time.
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` is like `.await` but can wait for multiple futures concurrently.
    // If we're temporarily blocked in the `learn_and_sing` future, the `dance`
    // future will take over the current thread. If `dance` becomes blocked,
    // `learn_and_sing` can take back over. If both futures are blocked, then
    // `async_main` is blocked and will yield to the executor.
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

In this example, learning the song must happen before singing the song, but both learning and singing can happen at the same time as dancing. If we used block_on(learn_song()) rather than learn_song().await in learn_and_sing, the thread wouldn't be able to do anything else while learn_song was running. This would make it impossible to dance at the same time. By .await-ing the learn_song future, we allow other tasks to take over the current thread if learn_song is blocked. This makes it possible to run multiple futures to completion concurrently on the same thread.

Now that you've learned the basics of async/await, let's try out an example.

Applied: Simple HTTP Server

Let's use async/.await to build an echo server!

To start, run rustup update stable to make sure you've got stable Rust 1.39 or newer. Once you've done that, run cargo new async-await-echo to create a new project, and open up the resulting async-await-echo folder.

Let's add some dependencies to the Cargo.toml file:

[dependencies]
# The latest version of the "futures" library, which has lots of utilities
# for writing async code. Enable the "compat" feature to include the
# functions for using futures 0.3 and async/await with the Hyper library,
# which use futures 0.1.
futures = { version = "0.3", features = ["compat"] }

# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP
# server and to make HTTP requests.
hyper = "0.12.9"

Now that we've got our dependencies out of the way, let's start writing some code. We have some imports to add:


#![allow(unused_variables)]
fn main() {
use {
    hyper::{
        // Miscellaneous types from Hyper for working with HTTP.
        Body, Client, Request, Response, Server, Uri,

        // This function turns a closure which returns a future into an
        // implementation of the the Hyper `Service` trait, which is an
        // asynchronous function from a generic `Request` to a `Response`.
        service::service_fn,

        // A function which runs a future to completion using the Hyper runtime.
        rt::run,
    },
    futures::{
        // Extension trait for futures 0.1 futures, adding the `.compat()` method
        // which allows us to use `.await` on 0.1 futures.
        compat::Future01CompatExt,
        // Extension traits providing additional methods on futures.
        // `FutureExt` adds methods that work for all futures, whereas
        // `TryFutureExt` adds methods to futures that return `Result` types.
        future::{FutureExt, TryFutureExt},
    },
    std::net::SocketAddr,
};
}

Once the imports are out of the way, we can start putting together the boilerplate to allow us to serve requests:

async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    // Always return successfully with a response containing a body with
    // a friendly greeting ;)
    Ok(Response::new(Body::from("hello, world!")))
}

async fn run_server(addr: SocketAddr) {
    println!("Listening on http://{}", addr);

    // Create a server bound on the provided address
    let serve_future = Server::bind(&addr)
        // Serve requests using our `async serve_req` function.
        // `serve` takes a closure which returns a type implementing the
        // `Service` trait. `service_fn` returns a value implementing the
        // `Service` trait, and accepts a closure which goes from request
        // to a future of the response. To use our `serve_req` function with
        // Hyper, we have to box it and put it in a compatability
        // wrapper to go from a futures 0.3 future (the kind returned by
        // `async fn`) to a futures 0.1 future (the kind used by Hyper).
        .serve(|| service_fn(|req| serve_req(req).boxed().compat()));

    // Wait for the server to complete serving or exit with an error.
    // If an error occurred, print it to stderr.
    if let Err(e) = serve_future.compat().await {
        eprintln!("server error: {}", e);
    }
}

fn main() {
    // Set the address to run our socket on.
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    // Call our `run_server` function, which returns a future.
    // As with every `async fn`, for `run_server` to do anything,
    // the returned future needs to be run. Additionally,
    // we need to convert the returned future from a futures 0.3 future into a
    // futures 0.1 future.
    let futures_03_future = run_server(addr);
    let futures_01_future = futures_03_future.unit_error().boxed().compat();

    // Finally, we can run the future to completion using the `run` function
    // provided by Hyper.
    run(futures_01_future);
}

If you cargo run now, you should see the message "Listening on http://127.0.0.1:3000" printed on your terminal. If you open that URL in your browser of choice, you'll see "hello, world!" appear in your browser. Congratulations! You just wrote your first asynchronous webserver in Rust.

You can also inspect the request itself, which contains information such as the request URI, HTTP version, headers, and other metadata. For example, we can print out the URI of the request like this:


#![allow(unused_variables)]
fn main() {
println!("Got request at {:?}", req.uri());
}

You may have noticed that we're not yet doing anything asynchronous when handling the request-- we just respond immediately, so we're not taking advantage of the flexibility that async fn gives us. Rather than just returning a static message, let's try proxying the user's request to another website using Hyper's HTTP client.

We start by parsing out the URL we want to request:


#![allow(unused_variables)]
fn main() {
let url_str = "http://www.rust-lang.org/en-US/";
let url = url_str.parse::<Uri>().expect("failed to parse URL");
}

Then we can create a new hyper::Client and use it to make a GET request, returning the response to the user:


#![allow(unused_variables)]
fn main() {
let res = Client::new().get(url).compat().await;
// Return the result of the request directly to the user
println!("request finished-- returning response");
res
}

Client::get returns a hyper::client::FutureResponse, which implements Future<Output = Result<Response, Error>> (or Future<Item = Response, Error = Error> in futures 0.1 terms). When we .await that future, an HTTP request is sent out, the current task is suspended, and the task is queued to be continued once a response has become available.

Now, if you cargo run and open http://127.0.0.1:3000/foo in your browser, you'll see the Rust homepage, and the following terminal output:

Listening on http://127.0.0.1:3000
Got request at /foo
making request to http://www.rust-lang.org/en-US/
request finished-- returning response

Congratulations! You just proxied an HTTP request.

Under the Hood: Executing Futures and Tasks

In this section, we'll cover the underlying structure of how Futures and asynchronous tasks are scheduled. If you're only interested in learning how to write higher-level code that uses existing Future types and aren't interested in the details of how Future types work, you can skip ahead to the async/await chapter. However, several of the topics discussed in this chapter are useful for understanding how async/await code works, understanding the runtime and performance properties of async/await code, and building new asynchronous primitives. If you decide to skip this section now, you may want to bookmark it to revisit in the future.

Now, with that out of the, way, let's talk about the Future trait.

The Future Trait

The Future trait is at the center of asynchronous programming in Rust. A Future is an asynchronous computation that can produce a value (although that value may be empty, e.g. ()). A simplified version of the future trait might look something like this:


#![allow(unused_variables)]
fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
}

Futures can be advanced by calling the poll function, which will drive the future as far towards completion as possible. If the future completes, it returns Poll::Ready(result). If the future is not able to complete yet, it returns Poll::Pending and arranges for the wake() function to be called when the Future is ready to make more progress. When wake() is called, the executor driving the Future will call poll again so that the Future can make more progress.

Without wake(), the executor would have no way of knowing when a particular future could make progress, and would have to be constantly polling every future. With wake(), the executor knows exactly which futures are ready to be polled.

For example, consider the case where we want to read from a socket that may or may not have data available already. If there is data, we can read it in and return Poll::Ready(data), but if no data is ready, our future is blocked and can no longer make progress. When no data is available, we must register wake to be called when data becomes ready on the socket, which will tell the executor that our future is ready to make progress. A simple SocketRead future might look something like this:


#![allow(unused_variables)]
fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
}

This model of Futures allows for composing together multiple asynchronous operations without needing intermediate allocations. Running multiple futures at once or chaining futures together can be implemented via allocation-free state machines, like this:


#![allow(unused_variables)]
fn main() {
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}
}

This shows how multiple futures can be run simultaneously without needing separate allocations, allowing for more efficient asynchronous programs. Similarly, multiple sequential futures can be run one after another, like this:


#![allow(unused_variables)]
fn main() {
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}
}

These examples show how the Future trait can be used to express asynchronous control flow without requiring multiple allocated objects and deeply nested callbacks. With the basic control-flow out of the way, let's talk about the real Future trait and how it is different.


#![allow(unused_variables)]
fn main() {
trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
}

The first change you'll notice is that our self type is no longer &mut self, but has changed to Pin<&mut Self>. We'll talk more about pinning in a later section, but for now know that it allows us to create futures that are immovable. Immovable objects can store pointers between their fields, e.g. struct MyFut { a: i32, ptr_to_a: *const i32 }. Pinning is necessary to enable async/await.

Secondly, wake: fn() has changed to &mut Context<'_>. In SimpleFuture, we used a call to a function pointer (fn()) to tell the future executor that the future in question should be polled. However, since fn() is zero-sized, it can't store any data about which Future called wake.

In a real-world scenario, a complex application like a web server may have thousands of different connections whose wakeups should all be managed separately. The Context type solves this by providing access to a value of type Waker, which can be used to wake up a specific task.

Task Wakeups with Waker

It's common that futures aren't able to complete the first time they are polled. When this happens, the future needs to ensure that it is polled again once it is ready to make more progress. This is done with the Waker type.

Each time a future is polled, it is polled as part of a "task". Tasks are the top-level futures that have been submitted to an executor.

Waker provides a wake() method that can be used to tell the executor that the associated task should be awoken. When wake() is called, the executor knows that the task associated with the Waker is ready to make progress, and its future should be polled again.

Waker also implements clone() so that it can be copied around and stored.

Let's try implementing a simple timer future using Waker.

Applied: Build a Timer

For the sake of the example, we'll just spin up a new thread when the timer is created, sleep for the required time, and then signal the timer future when the time window has elapsed.

Here are the imports we'll need to get started:


#![allow(unused_variables)]
fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
}

Let's start by defining the future type itself. Our future needs a way for the thread to communicate that the timer has elapsed and the future should complete. We'll use a shared Arc<Mutex<..>> value to communicate between the thread and the future.


#![allow(unused_variables)]
fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}
}

Now, let's actually write the Future implementation!


#![allow(unused_variables)]
fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
}

Pretty simple, right? If the thread has set shared_state.completed = true, we're done! Otherwise, we clone the Waker for the current task and pass it to shared_state.waker so that the thread can wake the task back up.

Importantly, we have to update the Waker every time the future is polled because the future may have moved to a different task with a different Waker. This will happen when futures are passed around between tasks after being polled.

Finally, we need the API to actually construct the timer and start the thread:


#![allow(unused_variables)]
fn main() {
impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
}

Woot! That's all we need to build a simple timer future. Now, if only we had an executor to run the future on...

Applied: Build an Executor

Rust's Futures are lazy: they won't do anything unless actively driven to completion. One way to drive a future to completion is to .await it inside an async function, but that just pushes the problem one level up: who will run the futures returned from the top-level async functions? The answer is that we need a Future executor.

Future executors take a set of top-level Futures and run them to completion by calling poll whenever the Future can make progress. Typically, an executor will poll a future once to start off. When Futures indicate that they are ready to make progress by calling wake(), they are placed back onto a queue and poll is called again, repeating until the Future has completed.

In this section, we'll write our own simple executor capable of running a large number of top-level futures to completion concurrently.

For this example, we depend on the futures crate for the ArcWake trait, which provides an easy way to construct a Waker.

[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "=0.3.0-alpha.17"

Next, we need the following imports at the top of src/main.rs:


#![allow(unused_variables)]
fn main() {
use {
    futures::{
        future::{FutureExt, BoxFuture},
        task::{ArcWake, waker_ref},
    },
    std::{
        future::Future,
        sync::{Arc, Mutex},
        sync::mpsc::{sync_channel, SyncSender, Receiver},
        task::{Context, Poll},
        time::Duration,
    },
    // The timer we wrote in the previous section:
    timer_future::TimerFuture,
};
}

Our executor will work by sending tasks to run over a channel. The executor will pull events off of the channel and run them. When a task is ready to do more work (is awoken), it can schedule itself to be polled again by putting itself back onto the channel.

In this design, the executor itself just needs the receiving end of the task channel. The user will get a sending end so that they can spawn new futures. Tasks themselves are just futures that can reschedule themselves, so we'll store them as a future paired with a sender that the task can use to requeue itself.


#![allow(unused_variables)]
fn main() {
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}
}

Let's also add a method to spawner to make it easy to spawn new futures. This method will take a future type, box it and put it in a FutureObj, and create a new Arc<Task> with it inside which can be enqueued onto the executor.


#![allow(unused_variables)]
fn main() {
impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}
}

To poll futures, we'll need to create a Waker. As discussed in the task wakeups section, Wakers are responsible for scheduling a task to be polled again once wake is called. Remember that Wakers tell the executor exactly which task has become ready, allowing them to poll just the futures that are ready to make progress. The easiest way to create a new Waker is by implementing the ArcWake trait and then using the waker_ref or .into_waker() functions to turn an Arc<impl ArcWake> into a Waker. Let's implement ArcWake for our tasks to allow them to be turned into Wakers and awoken:


#![allow(unused_variables)]
fn main() {
impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self.task_sender.send(cloned).expect("too many tasks queued");
    }
}
}

When a Waker is created from an Arc<Task>, calling wake() on it will cause a copy of the Arc to be sent onto the task channel. Our executor then needs to pick up the task and poll it. Let's implement that:


#![allow(unused_variables)]
fn main() {
impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}
}

Congratulations! We now have a working futures executor. We can even use it to run async/.await code and custom futures, such as the TimerFuture we wrote earlier:

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}

Executors and System IO

In the previous section on The Future Trait, we discussed this example of a future that performed an asynchronous read on a socket:


#![allow(unused_variables)]
fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
}

This future will read available data on a socket, and if no data is available, it will yield to the executor, requesting that its task be awoken when the socket becomes readable again. However, it's not clear from this example how the Socket type is implemented, and in particular it isn't obvious how the set_readable_callback function works. How can we arrange for lw.wake() to be called once the socket becomes readable? One option would be to have a thread that continually checks whether socket is readable, calling wake() when appropriate. However, this would be quite inefficient, requiring a separate thread for each blocked IO future. This would greatly reduce the efficiency of our async code.

In practice, this problem is solved through integration with an IO-aware system blocking primitive, such as epoll on Linux, kqueue on FreeBSD and Mac OS, IOCP on Windows, and ports on Fuchsia (all of which are exposed through the cross-platform Rust crate mio). These primitives all allow a thread to block on multiple asynchronous IO events, returning once one of the events completes. In practice, these APIs usually look something like this:


#![allow(unused_variables)]
fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    id: usize,

    // A set of signals to wait for, or which occurred.
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    fn new() -> Self { ... }

    /// Express an interest in a particular IO event.
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { ... }

    /// Block until one of the events occurs.
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
println!("Socket {:?} is now {:?}", event.id, event.signals);
}

Futures executors can use these primitives to provide asynchronous IO objects such as sockets that can configure callbacks to be run when a particular IO event occurs. In the case of our SocketRead example above, the Socket::set_readable_callback function might look like the following pseudocode:


#![allow(unused_variables)]
fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
}

We can now have just one executor thread which can receive and dispatch any IO event to the appropriate Waker, which will wake up the corresponding task, allowing the executor to drive more tasks to completion before returning to check for more IO events (and the cycle continues...).

async/.await

In the first chapter, we took a brief look at async/.await and used it to build a simple server. This chapter will discuss async/.await in greater detail, explaining how it works and how async code differs from traditional Rust programs.

async/.await are special pieces of Rust syntax that make it possible to yield control of the current thread rather than blocking, allowing other code to make progress while waiting on an operation to complete.

There are two main ways to use async: async fn and async blocks. Each returns a value that implements the Future trait:


#![allow(unused_variables)]

fn main() {
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    async {
        let x: u8 = foo().await;
        x + 5
    }
}
}

As we saw in the first chapter, async bodies and other futures are lazy: they do nothing until they are run. The most common way to run a Future is to .await it. When .await is called on a Future, it will attempt to run it to completion. If the Future is blocked, it will yield control of the current thread. When more progress can be made, the Future will be picked up by the executor and will resume running, allowing the .await to resolve.

async Lifetimes

Unlike traditional functions, async fns which take references or other non-'static arguments return a Future which is bounded by the lifetime of the arguments:


#![allow(unused_variables)]
fn main() {
// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}
}

This means that the future returned from an async fn must be .awaited while its non-'static arguments are still valid. In the common case of .awaiting the future immediately after calling the function (as in foo(&x).await) this is not an issue. However, if storing the future or sending it over to another task or thread, this may be an issue.

One common workaround for turning an async fn with references-as-arguments into a 'static future is to bundle the arguments with the call to the async fn inside an async block:


#![allow(unused_variables)]
fn main() {
fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}
}

By moving the argument into the async block, we extend its lifetime to match that of the Future returned from the call to good.

async move

async blocks and closures allow the move keyword, much like normal closures. An async move block will take ownership of the variables it references, allowing it to outlive the current scope, but giving up the ability to share those variables with other code:


#![allow(unused_variables)]
fn main() {
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{}", my_string);
    };

    let future_two = async {
        // ...
        println!("{}", my_string);
    };

    // Run both futures to completion, printing "foo" twice:
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{}", my_string);
    }
}
}

.awaiting on a Multithreaded Executor

Note that, when using a multithreaded Future executor, a Future may move between threads, so any variables used in async bodies must be able to travel between threads, as any .await can potentially result in a switch to a new thread.

This means that it is not safe to use Rc, &RefCell or any other types that don't implement the Send trait, including references to types that don't implement the Sync trait.

(Caveat: it is possible to use these types so long as they aren't in scope during a call to .await.)

Similarly, it isn't a good idea to hold a traditional non-futures-aware lock across an .await, as it can cause the threadpool to lock up: one task could take out a lock, .await and yield to the executor, allowing another task to attempt to take the lock and cause a deadlock. To avoid this, use the Mutex in futures::lock rather than the one from std::sync.

Pinning

To poll futures, they must be pinned using a special type called Pin<T>. If you read the explanation of the Future trait in the previous section "Executing Futures and Tasks", you'll recognise Pin from the self: Pin<&mut Self> in the Future:poll method's definition. But what does it mean, and why do we need it?

Why Pinning

Pinning makes it possible to guarantee that an object won't ever be moved. To understand why this is necessary, we need to remember how async/.await works. Consider the following code:


#![allow(unused_variables)]
fn main() {
let fut_one = ...;
let fut_two = ...;
async move {
    fut_one.await;
    fut_two.await;
}
}

Under the hood, this creates an anonymous type that implements Future, providing a poll method that looks something like this:


#![allow(unused_variables)]
fn main() {
// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}

// List of states our `async` block can be in
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}

impl Future for AsyncFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}
}

When poll is first called, it will poll fut_one. If fut_one can't complete, AsyncFuture::poll will return. Future calls to poll will pick up where the previous one left off. This process continues until the future is able to successfully complete.

However, what happens if we have an async block that uses references? For example:


#![allow(unused_variables)]
fn main() {
async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}
}

What struct does this compile down to?


#![allow(unused_variables)]
fn main() {
struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}
}

Here, the ReadIntoBuf future holds a reference into the other field of our structure, x. However, if AsyncFuture is moved, the location of x will move as well, invalidating the pointer stored in read_into_buf_fut.buf.

Pinning futures to a particular spot in memory prevents this problem, making it safe to create references to values inside an async block.

How to Use Pinning

The Pin type wraps pointer types, guaranteeing that the values behind the pointer won't be moved. For example, Pin<&mut T>, Pin<&T>, Pin<Box<T>> all guarantee that T won't be moved.

Most types don't have a problem being moved. These types implement a trait called Unpin. Pointers to Unpin types can be freely placed into or taken out of Pin. For example, u8 is Unpin, so Pin<&mut u8> behaves just like a normal &mut u8.

Some functions require the futures they work with to be Unpin. To use a Future or Stream that isn't Unpin with a function that requires Unpin types, you'll first have to pin the value using either Box::pin (to create a Pin<Box<T>>) or the pin_utils::pin_mut! macro (to create a Pin<&mut T>). Pin<Box<Fut>> and Pin<&mut Fut> can both be used as futures, and both implement Unpin.

For example:


#![allow(unused_variables)]
fn main() {
use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io

// A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { ... }

let fut = async { ... };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait

// Pinning with `Box`:
let fut = async { ... };
let fut = Box::pin(fut);
execute_unpin_future(fut); // OK

// Pinning with `pin_mut!`:
let fut = async { ... };
pin_mut!(fut);
execute_unpin_future(fut); // OK
}

The Stream Trait

The Stream trait is similar to Future but can yield multiple values before completing, similar to the Iterator trait from the standard library:


#![allow(unused_variables)]
fn main() {
trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Retuns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}
}

One common example of a Stream is the Receiver for the channel type from the futures crate. It will yield Some(val) every time a value is sent from the Sender end, and will yield None once the Sender has been dropped and all pending messages have been received:


#![allow(unused_variables)]
fn main() {
async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}
}

Iteration and Concurrency

Similar to synchronous Iterators, there are many different ways to iterate over and process the values in a Stream. There are combinator-style methods such as map, filter, and fold, and their early-exit-on-error cousins try_map, try_filter, and try_fold.

Unfortunately, for loops are not usable with Streams, but for imperative-style code, while let and the next/try_next functions can be used:


#![allow(unused_variables)]
fn main() {
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}
}

However, if we're just processing one element at a time, we're potentially leaving behind opportunity for concurrency, which is, after all, why we're writing async code in the first place. To process multiple items from a stream concurrently, use the for_each_concurrent and try_for_each_concurrent methods:


#![allow(unused_variables)]
fn main() {
async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}
}

Executing Multiple Futures at a Time

Up until now, we've mostly executed futures by using .await, which blocks the current task until a particular Future completes. However, real asynchronous applications often need to execute several different operations concurrently.

Executing Multiple Futures at a Time

In this chapter, we'll cover some ways to execute multiple asynchronous operations at the same time:

  • join!: waits for futures to all complete
  • select!: waits for one of several futures to complete
  • Spawning: creates a top-level task which ambiently runs a future to completion
  • FuturesUnordered: a group of futures which yields the result of each subfuture

join!

The futures::join macro makes it possible to wait for multiple different futures to complete while executing them all concurrently.

join!

When performing multiple asynchronous operations, it's tempting to simply .await them in a series:


#![allow(unused_variables)]
fn main() {
async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}
}

However, this will be slower than necessary, since it won't start trying to get_music until after get_book has completed. In some other languages, futures are ambiently run to completion, so two operations can be run concurrently by first calling the each async fn to start the futures, and then awaiting them both:


#![allow(unused_variables)]
fn main() {
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}
}

However, Rust futures won't do any work until they're actively .awaited. This means that the two code snippets above will both run book_future and music_future in series rather than running them concurrently. To correctly run the two futures concurrently, use futures::join!:


#![allow(unused_variables)]
fn main() {
use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}
}

The value returned by join! is a tuple containing the output of each Future passed in.

try_join!

For futures which return Result, consider using try_join! rather than join!. Since join! only completes once all subfutures have completed, it'll continue processing other futures even after one of its subfutures has returned an Err.

Unlike join!, try_join! will complete immediately if one of the subfutures returns an error.


#![allow(unused_variables)]
fn main() {
use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
}

Note that the futures passed to try_join! must all have the same error type. Consider using the .map_err(|e| ...) and .err_into() functions from futures::future::TryFutureExt to consolidate the error types:


#![allow(unused_variables)]
fn main() {
use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}
}

select!

The futures::select macro runs multiple futures simultaneously, allowing the user to respond as soon as any future completes.


#![allow(unused_variables)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}

The function above will run both t1 and t2 concurrently. When either t1 or t2 finishes, the corresponding handler will call println!, and the function will end without completing the remaining task.

The basic syntax for select is <pattern> = <expression> => <code>,, repeated for as many futures as you would like to select over.

default => ... and complete => ...

select also supports default and complete branches.

A default branch will run if none of the futures being selected over are yet complete. A select with a default branch will therefore always return immediately, since default will be run if none of the other futures are ready.

complete branches can be used to handle the case where all futures being selected over have completed and will no longer make progress. This is often handy when looping over a select!.


#![allow(unused_variables)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

Interaction with Unpin and FusedFuture

One thing you may have noticed in the first example above is that we had to call .fuse() on the futures returned by the two async fns, as well as pinning them with pin_mut. Both of these calls are necessary because the futures used in select must implement both the Unpin trait and the FusedFuture trait.

Unpin is necessary because the futures used by select are not taken by value, but by mutable reference. By not taking ownership of the future, uncompleted futures can be used again after the call to select.

Similarly, the FusedFuture trait is required because select must not poll a future after it has completed. FusedFuture is implemented by futures which track whether or not they have completed. This makes it possible to use select in a loop, only polling the futures which still have yet to complete. This can be seen in the example above, where a_fut or b_fut will have completed the second time through the loop. Because the future returned by future::ready implements FusedFuture, it's able to tell select not to poll it again.

Note that streams have a corresponding FusedStream trait. Streams which implement this trait or have been wrapped using .fuse() will yield FusedFuture futures from their .next() / .try_next() combinators.


#![allow(unused_variables)]
fn main() {
use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}
}

Concurrent tasks in a select loop with Fuse and FuturesUnordered

One somewhat hard-to-discover but handy function is Fuse::terminated(), which allows constructing an empty future which is already terminated, and can later be filled in with a future that needs to be run.

This can be handy when there's a task that needs to be run during a select loop but which is created inside the select loop itself.

Note the use of the .select_next_some() function. This can be used with select to only run the branch for Some(_) values returned from the stream, ignoring Nones.


#![allow(unused_variables)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
}

When many copies of the same future need to be run simultaneously, use the FuturesUnordered type. The following example is similar to the one above, but will run each copy of run_on_new_num_fut to completion, rather than aborting them when a new one is created. It will also print out a value returned by run_on_new_num_fut.


#![allow(unused_variables)]
fn main() {
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

}

Workarounds to Know and Love

Rust's async support is still fairly new, and there are a handful of highly-requested features still under active development, as well as some subpar diagnostics. This chapter will discuss some common pain points and explain how to work around them.

Return Type Errors

In a typical Rust function, returning a value of the wrong type will result in an error that looks something like this:

error[E0308]: mismatched types
 --> src/main.rs:2:12
  |
1 | fn foo() {
  |           - expected `()` because of default return type
2 |     return "foo"
  |            ^^^^^ expected (), found reference
  |
  = note: expected type `()`
             found type `&'static str`

However, the current async fn support doesn't know to "trust" the return type written in the function signature, causing mismatched or even reversed-sounding errors. For example, the function async fn foo() { "foo" } results in this error:

error[E0271]: type mismatch resolving `<impl std::future::Future as std::future::Future>::Output == ()`
 --> src/lib.rs:1:16
  |
1 | async fn foo() {
  |                ^ expected &str, found ()
  |
  = note: expected type `&str`
             found type `()`
  = note: the return type of a function must have a statically known size

The error says that it expected &str and found (), which is actually the exact opposite of what you'd want. This is because the compiler is incorrectly trusting the function body to return the correct type.

The workaround for this issue is to recognize that errors pointing to the function signature with the message "expected SomeType, found OtherType" usually indicate that one or more return sites are incorrect.

A fix to this issue is being tracked in this bug.

Box<dyn Trait>

Similarly, because the return type from the function signature is not propagated down correctly, values returned from async fn aren't correctly coerced to their expected type.

In practice, this means that returning Box<dyn Trait> objects from an async fn requires manually as-casting from Box<MyType> to Box<dyn Trait>.

This code will result in an error:

async fn x() -> Box<dyn std::fmt::Display> {
    Box::new("foo")
}

This issue can be worked around by manually casting using as:

async fn x() -> Box<dyn std::fmt::Display> {
    Box::new("foo") as Box<dyn std::fmt::Display>
}

A fix to this issue is being tracked in this bug.

? in async Blocks

Just as in async fn, it's common to use ? inside async blocks. However, the return type of async blocks isn't explicitly stated. This can cause the compiler to fail to infer the error type of the async block.

For example, this code:


#![allow(unused_variables)]
fn main() {
let fut = async {
    foo().await?;
    bar().await?;
    Ok(())
};
}

will trigger this error:

error[E0282]: type annotations needed
 --> src/main.rs:5:9
  |
4 |     let fut = async {
  |         --- consider giving `fut` a type
5 |         foo().await?;
  |         ^^^^^^^^^^^^ cannot infer type

Unfortunately, there's currently no way to "give fut a type", nor a way to explicitly specify the return type of an async block. To work around this, use the "turbofish" operator to supply the success and error types for the async block:


#![allow(unused_variables)]
fn main() {
let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), MyError>(()) // <- note the explicit type annotation here
};
}

Send Approximation

Some async fn state machines are safe to be sent across threads, while others are not. Whether or not an async fn Future is Send is determined by whether a non-Send type is held across an .await point. The compiler does its best to approximate when values may be held across an .await point, but this analysis is too conservative in a number of places today.

For example, consider a simple non-Send type, perhaps a type which contains an Rc:


#![allow(unused_variables)]
fn main() {
use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);
}

Variables of type NotSend can briefly appear as temporaries in async fns even when the resulting Future type returned by the async fn must be Send:

use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

However, if we change foo to store NotSend in a variable, this example no longer compiles:


#![allow(unused_variables)]
fn main() {
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn foo() {
    let x = NotSend::default();
    bar().await;
}
}
error[E0277]: `std::rc::Rc<()>` cannot be sent between threads safely
  --> src/main.rs:15:5
   |
15 |     require_send(foo());
   |     ^^^^^^^^^^^^ `std::rc::Rc<()>` cannot be sent between threads safely
   |
   = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<()>`
   = note: required because it appears within the type `NotSend`
   = note: required because it appears within the type `{NotSend, impl std::future::Future, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:7:16: 10:2 {NotSend, impl std::future::Future, ()}]>`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `impl std::future::Future`
note: required by `require_send`
  --> src/main.rs:12:1
   |
12 | fn require_send(_: impl Send) {}
   | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.

This error is correct. If we store x into a variable, it won't be dropped until after the .await, at which point the async fn may be running on a different thread. Since Rc is not Send, allowing it to travel across threads would be unsound. One simple solution to this would be to drop the Rc before the .await, but unfortunately that does not work today.

In order to successfully work around this issue, you may have to introduce a block scope encapsulating any non-Send variables. This makes it easier for the compiler to tell that these variables do not live across an .await point.


#![allow(unused_variables)]
fn main() {
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}
}

Recursion

Internally, async fn creates a state machine type containing each sub-Future being .awaited. This makes recursive async fns a little tricky, since the resulting state machine type has to contain itself:


#![allow(unused_variables)]
fn main() {
// This function:
async fn foo() {
    step_one().await;
    step_two().await;
}
// generates a type like this:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}

// So this function:
async fn recursive() {
    recursive().await;
    recursive().await;
}

// generates a type like this:
enum Recursive {
    First(Recursive),
    Second(Recursive),
}
}

This won't work-- we've created an infinitely-sized type! The compiler will complain:

error[E0733]: recursion in an `async fn` requires boxing
 --> src/lib.rs:1:22
  |
1 | async fn recursive() {
  |                      ^ an `async fn` cannot invoke itself directly
  |
  = note: a recursive `async fn` must be rewritten to return a boxed future.

In order to allow this, we have to introduce an indirection using Box. Unfortunately, compiler limitations mean that just wrapping the calls to recursive() in Box::pin isn't enough. To make this work, we have to make recursive into a non-async function which returns a .boxed() async block:


#![allow(unused_variables)]
fn main() {
use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}
}

async in Traits

Currently, async fn cannot be used in traits. The reasons for this are somewhat complex, but there are plans to remove this restriction in the future.

In the meantime, however, this can be worked around using the async_trait crate from crates.io.

Note that using these trait methods will result in a heap allocation per-function-call. This is not a significant cost for the vast majority of applications, but should be considered when deciding whether to use this functionality in the public API of a low-level function that is expected to be called millions of times a second.