select!
The futures::select
macro runs multiple futures simultaneously, allowing
the user to respond as soon as any future completes.
#![allow(unused)] fn main() { use futures::{ future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() { /* ... */ } async fn task_two() { /* ... */ } async fn race_tasks() { let t1 = task_one().fuse(); let t2 = task_two().fuse(); pin_mut!(t1, t2); select! { () = t1 => println!("task one completed first"), () = t2 => println!("task two completed first"), } } }
The function above will run both t1
and t2
concurrently. When either
t1
or t2
finishes, the corresponding handler will call println!
, and
the function will end without completing the remaining task.
The basic syntax for select
is <pattern> = <expression> => <code>,
,
repeated for as many futures as you would like to select
over.
default => ...
and complete => ...
select
also supports default
and complete
branches.
A default
branch will run if none of the futures being select
ed
over are yet complete. A select
with a default
branch will
therefore always return immediately, since default
will be run
if none of the other futures are ready.
complete
branches can be used to handle the case where all futures
being select
ed over have completed and will no longer make progress.
This is often handy when looping over a select!
.
#![allow(unused)] fn main() { use futures::{future, select}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, default => unreachable!(), // never runs (futures are ready, then complete) }; } assert_eq!(total, 10); } }
Interaction with Unpin
and FusedFuture
One thing you may have noticed in the first example above is that we
had to call .fuse()
on the futures returned by the two async fn
s,
as well as pinning them with pin_mut
. Both of these calls are necessary
because the futures used in select
must implement both the Unpin
trait and the FusedFuture
trait.
Unpin
is necessary because the futures used by select
are not
taken by value, but by mutable reference. By not taking ownership
of the future, uncompleted futures can be used again after the
call to select
.
Similarly, the FusedFuture
trait is required because select
must
not poll a future after it has completed. FusedFuture
is implemented
by futures which track whether or not they have completed. This makes
it possible to use select
in a loop, only polling the futures which
still have yet to complete. This can be seen in the example above,
where a_fut
or b_fut
will have completed the second time through
the loop. Because the future returned by future::ready
implements
FusedFuture
, it's able to tell select
not to poll it again.
Note that streams have a corresponding FusedStream
trait. Streams
which implement this trait or have been wrapped using .fuse()
will yield FusedFuture
futures from their
.next()
/ .try_next()
combinators.
#![allow(unused)] fn main() { use futures::{ stream::{Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams( mut s1: impl Stream<Item = u8> + FusedStream + Unpin, mut s2: impl Stream<Item = u8> + FusedStream + Unpin, ) -> u8 { let mut total = 0; loop { let item = select! { x = s1.next() => x, x = s2.next() => x, complete => break, }; if let Some(next_num) = item { total += next_num; } } total } }
Concurrent tasks in a select
loop with Fuse
and FuturesUnordered
One somewhat hard-to-discover but handy function is Fuse::terminated()
,
which allows constructing an empty future which is already terminated,
and can later be filled in with a future that needs to be run.
This can be handy when there's a task that needs to be run during a select
loop but which is created inside the select
loop itself.
Note the use of the .select_next_some()
function. This can be
used with select
to only run the branch for Some(_)
values
returned from the stream, ignoring None
s.
#![allow(unused)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) { /* ... */ } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let run_on_new_num_fut = run_on_new_num(starting_num).fuse(); let get_new_num_fut = Fuse::terminated(); pin_mut!(run_on_new_num_fut, get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived -- start a new `run_on_new_num_fut`, // dropping the old one. run_on_new_num_fut.set(run_on_new_num(new_num).fuse()); }, // Run the `run_on_new_num_fut` () = run_on_new_num_fut => {}, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } }
When many copies of the same future need to be run simultaneously,
use the FuturesUnordered
type. The following example is similar
to the one above, but will run each copy of run_on_new_num_fut
to completion, rather than aborting them when a new one is created.
It will also print out a value returned by run_on_new_num_fut
.
#![allow(unused)] fn main() { use futures::{ future::{Fuse, FusedFuture, FutureExt}, stream::{FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn get_new_num() -> u8 { /* ... */ 5 } async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 } async fn run_loop( mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin, starting_num: u8, ) { let mut run_on_new_num_futs = FuturesUnordered::new(); run_on_new_num_futs.push(run_on_new_num(starting_num)); let get_new_num_fut = Fuse::terminated(); pin_mut!(get_new_num_fut); loop { select! { () = interval_timer.select_next_some() => { // The timer has elapsed. Start a new `get_new_num_fut` // if one was not already running. if get_new_num_fut.is_terminated() { get_new_num_fut.set(get_new_num().fuse()); } }, new_num = get_new_num_fut => { // A new number has arrived -- start a new `run_on_new_num_fut`. run_on_new_num_futs.push(run_on_new_num(new_num)); }, // Run the `run_on_new_num_futs` and check if any have completed res = run_on_new_num_futs.select_next_some() => { println!("run_on_new_num_fut returned {:?}", res); }, // panic if everything completed, since the `interval_timer` should // keep yielding values indefinitely. complete => panic!("`interval_timer` completed unexpectedly"), } } } }