Handling Connections Concurrently
The problem with our code so far is that listener.incoming()
is a blocking iterator.
The executor can't run other futures while listener
waits on incoming connections,
and we can't handle a new connection until we're done with the previous one.
In order to fix this, we'll transform listener.incoming()
from a blocking Iterator
to a non-blocking Stream. Streams are similar to Iterators, but can be consumed asynchronously.
For more information, see the chapter on Streams.
Let's replace our blocking std::net::TcpListener
with the non-blocking async_std::net::TcpListener
,
and update our connection handler to accept an async_std::net::TcpStream
:
use async_std::prelude::*;
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
//<-- snip -->
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
The asynchronous version of TcpListener
implements the Stream
trait for listener.incoming()
,
a change which provides two benefits.
The first is that listener.incoming()
no longer blocks the executor.
The executor can now yield to other pending futures
while there are no incoming TCP connections to be processed.
The second benefit is that elements from the Stream can optionally be processed concurrently,
using a Stream's for_each_concurrent
method.
Here, we'll take advantage of this method to handle each incoming request concurrently.
We'll need to import the Stream
trait from the futures
crate, so our Cargo.toml now looks like this:
+[dependencies]
+futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
Now, we can handle each connection concurrently by passing handle_connection
in through a closure function.
The closure function takes ownership of each TcpStream
, and is run as soon as a new TcpStream
becomes available.
As long as handle_connection
does not block, a slow request will no longer prevent other requests from completing.
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
handle_connection(tcpstream).await;
})
.await;
}
Serving Requests in Parallel
Our example so far has largely presented cooperative multitasking concurrency (using async code)
as an alternative to preemptive multitasking (using threads).
However, async code and threads are not mutually exclusive.
In our example, for_each_concurrent
processes each connection concurrently, but on the same thread.
The async-std
crate allows us to spawn tasks onto separate threads as well.
Because handle_connection
is both Send
and non-blocking, it's safe to use with async_std::task::spawn
.
Here's what that would look like:
use async_std::task::spawn; #[async_std::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap(); listener .incoming() .for_each_concurrent(/* limit */ None, |stream| async move { let stream = stream.unwrap(); spawn(handle_connection(stream)); }) .await; }
Now we are using both cooperative multitasking concurrency and preemptive multitasking to handle multiple requests at the same time! See the section on multithreaded executors for more information.