Note: fahrenheit was previously called “toykio”, but it caused great confusion amongst reddit users.
In this blog post I’d like to present fahrenheit, a simple futures executor intended for learning about how executors with an event loop work. Fahrenheit only provides a very minimal feature set: An event loop and TCP streams and listeners. However, it turns out that due to the fact that futures are composable, this is enough to build complex clients and servers.
In the following, I’d like to give you a quick overview of fahrenheit’s components.
AsyncTcpStream
Fahrenheit defines the AsyncTcpStream
type, a newtype wrapper around TcpStream
from the standard library. Just like in std, the connect
function opens a connection, and sets the socket to non-blocking mode. This means that the read()
and write()
methods will return immediately, either with data or an error. If there’s not enough data (for reads) or buffer space (for writes) available, a special kind of error - WouldBlock
- is returned. We’ll discuss how to handle it in the next section.
AsyncRead
& AsyncWrite
The AsyncRead
and AsyncWrite
traits are the foundation of all I/O futures. The AsyncReadExt
and AsyncWriteExt
extension methods (like read
and write_all
) are built on top of them. These traits provide a way to connect futures to an event loop while keeping them independent from any particular event loop implementation.
Let’s take a look at the AsyncRead implementation for AsyncTcpStream
:
impl AsyncRead for AsyncTcpStream {
fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, Error>> {
match self.0.read(buf) {
Ok(len) => Poll::Ready(Ok(len)),
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// Get TcpStream file descriptor
let fd = self.0.as_raw_fd();
let waker = cx.waker();
REACTOR.with(|reactor| reactor.add_read_interest(fd, waker.clone()));
Poll::Pending
}
Err(err) => panic!("error {:?}", err),
}
}
}
It tries to perform a read on the underlying TcpStream
and if the read succeeds, the slice will be filled with data. If however the read fails with a WouldBlock
error, then it registers the current task’s waker so that it gets awoken when the data becomes available. More about this in the next section.
The AsyncWrite implementation does something analogous for writes.
Event loop
The EventLoop
(often also called “reactor”) is the core of the executor. It is defined like this:
struct InnerEventLoop {
read: RefCell<BTreeMap<RawFd, Waker>>,
write: RefCell<BTreeMap<RawFd, Waker>>,
counter: Cell<usize>,
wait_queue: RefCell<BTreeMap<TaskId, Task>>,
run_queue: RefCell<VecDeque<Wakeup>>,
}
read
andwrite
areBTreeMaps
which map file descriptors to wakers.wait_queue
stores blocked tasks which wait for an event- and
run_queue
stores wake-up notifications.
The event loop provides methods to register (and remove) interest in read and write events. Let’s look at what add_read_interest
does:
fn add_read_interest(&self, fd: RawFd, waker: Waker) {
if !self.read.borrow().contains_key(&fd) {
self.read.borrow_mut().insert(fd, waker);
}
}
But it just inserts fd
and waker
into the read
map! So where does all the magic happen?
In the main loop.
The event loop is called a loop for a reason. Let’s take a look:
loop {
// Event loop iteration timeout. If no descriptor is ready we continue iterating
let mut tv: timeval = timeval {
tv_sec: 1,
tv_usec: 0,
};
// Initialize fd_sets (file descriptor sets)
let mut read_fds: fd_set = unsafe { std::mem::zeroed() };
let mut write_fds: fd_set = unsafe { std::mem::zeroed() };
unsafe { FD_ZERO(&mut read_fds) };
unsafe { FD_ZERO(&mut write_fds) };
Woah, woah, a lot of unsafe
here! Well, don’t worry, it’s just how C FFI works. We need to initialize some C structures, a timeout and fd_set
s. They will be passed to the select(2) function later.
// Add read interests to read fd_sets
for fd in self.read.borrow().keys() {
unsafe { FD_SET(*fd, &mut read_fds as *mut fd_set) };
nfds = std::cmp::max(nfds, fd + 1);
}
// Add write interests to write fd_sets
for fd in self.write.borrow().keys() {
unsafe { FD_SET(*fd, &mut write_fds as *mut fd_set) };
nfds = std::cmp::max(nfds, fd + 1);
}
Here we add file descriptors from our read
and write
maps from before to the fd_set
s.
// `select` will block until some event happens on the fds or the timeout triggers
let rv = unsafe {
select(
nfds,
&mut read_fds,
&mut write_fds,
std::ptr::null_mut(),
&mut tv,
)
};
// Don't care for errors
if rv == -1 {
panic!("select()");
} else if rv == 0 {
debug!("timeout");
} else {
debug!("data available on {} fds", rv);
}
Finally we call select
with arguments we prepared earlier. select()
accepts 3 fd_sets (we ignore the third for this example) and a timeout and returns something non-zero in case one (or more) of the file descriptors in the sets are ready. We should then go and find which one it was!
// Check which fd it was and put appropriate future on run queue
for (fd, waker) in self.read.borrow().iter() {
let is_set = unsafe { FD_ISSET(*fd, &mut read_fds as *mut fd_set) };
if is_set {
waker.wake();
}
}
// Same for write
for (fd, waker) in self.write.borrow().iter() {
let is_set = unsafe { FD_ISSET(*fd, &mut write_fds as *mut fd_set) };
if is_set {
waker.wake();
}
}
We go through our maps again and check if they’re set (ready) in fd_sets. When they are, we
call wake
on their associated wakers, which in turn puts Wakeup
events on the ready-to-run queue.
let mut tasks_done = Vec::new();
// Now pop tasks from the run queue and poll them
while let Some(wakeup) = self.run_queue.borrow_mut().pop_front() {
let mut handle = self.handle();
if let Some(ref mut task) = self.wait_queue.borrow_mut().get_mut(&wakeup.index) {
// If a task returns `Poll::Ready`, we're done with it
if task.poll(wakeup.waker, &mut handle).is_ready() {
tasks_done.push(wakeup.index);
}
}
}
// Remove completed tasks
for idx in tasks_done {
self.wait_queue.borrow_mut().remove(&idx);
}
// Stop the loop if there are no more tasks in the `wait_queue`
if self.wait_queue.borrow().is_empty() {
return;
}
We drain the run_queue
, retrieve the task index on the wait_queue
and poll the tasks. Ready (done) tasks are removed from the wait_queue.
The life of the future
In this section, I’d like to recap how a future (let’s take Read
for example) is executed on the event loop:
- first it’s created using
read()
method on AsyncTcpStream. This method is implemented for all types that implementAsyncRead
trait. - then it’s spawned on the executor, using either
run()
orspawn()
method - the executor calls
poll()
method on the future.poll
implementation forRead
callspoll_read()
on theAsyncTcpStream
, which registers its interest inreadable
events - when an event occurs, future is polled again. This cycle is repeated until future is ready.
Thank you
Thanks to all futures team, and specially @aturon for encouragement and @MajorBreakfast for edits.
That’s all for today! You can find fahrenheit on github and crates.io. Happy hacking!