Handling Connections Concurrently

The problem with our code so far is that listener.incoming() is a blocking iterator. The executor can't run other futures while listener waits on incoming connections, and we can't handle a new connection until we're done with the previous one.

In order to fix this, we'll transform listener.incoming() from a blocking Iterator to a non-blocking Stream. Streams are similar to Iterators, but can be consumed asynchronously. For more information, see the chapter on Streams.

Let's replace our blocking std::net::TcpListener with the non-blocking async_std::net::TcpListener, and update our connection handler to accept an async_std::net::TcpStream:

use async_std::prelude::*;

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    //<-- snip -->
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

The asynchronous version of TcpListener implements the Stream trait for listener.incoming(), a change which provides two benefits. The first is that listener.incoming() no longer blocks the executor. The executor can now yield to other pending futures while there are no incoming TCP connections to be processed.

The second benefit is that elements from the Stream can optionally be processed concurrently, using a Stream's for_each_concurrent method. Here, we'll take advantage of this method to handle each incoming request concurrently. We'll need to import the Stream trait from the futures crate, so our Cargo.toml now looks like this:

+[dependencies]
+futures = "0.3"

 [dependencies.async-std]
 version = "1.6"
 features = ["attributes"]

Now, we can handle each connection concurrently by passing handle_connection in through a closure function. The closure function takes ownership of each TcpStream, and is run as soon as a new TcpStream becomes available. As long as handle_connection does not block, a slow request will no longer prevent other requests from completing.

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            handle_connection(tcpstream).await;
        })
        .await;
}

Serving Requests in Parallel

Our example so far has largely presented cooperative multitasking concurrency (using async code) as an alternative to preemptive multitasking (using threads). However, async code and threads are not mutually exclusive. In our example, for_each_concurrent processes each connection concurrently, but on the same thread. The async-std crate allows us to spawn tasks onto separate threads as well. Because handle_connection is both Send and non-blocking, it's safe to use with async_std::task::spawn. Here's what that would look like:

use async_std::task::spawn;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |stream| async move {
            let stream = stream.unwrap();
            spawn(handle_connection(stream));
        })
        .await;
}

Now we are using both cooperative multitasking concurrency and preemptive multitasking to handle multiple requests at the same time! See the section on multithreaded executors for more information.