Note: fahrenheit was previously called “toykio”, but it caused great confusion amongst reddit users.

In this blog post I’d like to present fahrenheit, a simple futures executor intended for learning about how executors with an event loop work. Fahrenheit only provides a very minimal feature set: An event loop and TCP streams and listeners. However, it turns out that due to the fact that futures are composable, this is enough to build complex clients and servers.

In the following, I’d like to give you a quick overview of fahrenheit’s components.

AsyncTcpStream

Fahrenheit defines the AsyncTcpStream type, a newtype wrapper around TcpStream from the standard library. Just like in std, the connect function opens a connection, and sets the socket to non-blocking mode. This means that the read() and write() methods will return immediately, either with data or an error. If there’s not enough data (for reads) or buffer space (for writes) available, a special kind of error - WouldBlock - is returned. We’ll discuss how to handle it in the next section.

AsyncRead & AsyncWrite

The AsyncRead and AsyncWrite traits are the foundation of all I/O futures. The AsyncReadExt and AsyncWriteExt extension methods (like read and write_all) are built on top of them. These traits provide a way to connect futures to an event loop while keeping them independent from any particular event loop implementation.

Let’s take a look at the AsyncRead implementation for AsyncTcpStream:

impl AsyncRead for AsyncTcpStream {
    fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, Error>> {
        match self.0.read(buf) {
            Ok(len) => Poll::Ready(Ok(len)),
            Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => {
                // Get TcpStream file descriptor
                let fd = self.0.as_raw_fd();
                let waker = cx.waker();

                REACTOR.with(|reactor| reactor.add_read_interest(fd, waker.clone()));

                Poll::Pending
            }
            Err(err) => panic!("error {:?}", err),
        }
    }
}

It tries to perform a read on the underlying TcpStream and if the read succeeds, the slice will be filled with data. If however the read fails with a WouldBlock error, then it registers the current task’s waker so that it gets awoken when the data becomes available. More about this in the next section.

The AsyncWrite implementation does something analogous for writes.

Event loop

The EventLoop (often also called “reactor”) is the core of the executor. It is defined like this:

struct InnerEventLoop {
    read: RefCell<BTreeMap<RawFd, Waker>>,
    write: RefCell<BTreeMap<RawFd, Waker>>,
    counter: Cell<usize>,
    wait_queue: RefCell<BTreeMap<TaskId, Task>>,
    run_queue: RefCell<VecDeque<Wakeup>>,
}
  • read and write are BTreeMaps which map file descriptors to wakers.
  • wait_queue stores blocked tasks which wait for an event
  • and run_queue stores wake-up notifications.

The event loop provides methods to register (and remove) interest in read and write events. Let’s look at what add_read_interest does:

fn add_read_interest(&self, fd: RawFd, waker: Waker) {
    if !self.read.borrow().contains_key(&fd) {
        self.read.borrow_mut().insert(fd, waker);
    }
}

But it just inserts fd and waker into the read map! So where does all the magic happen? In the main loop. The event loop is called a loop for a reason. Let’s take a look:

loop {
    // Event loop iteration timeout. If no descriptor is ready we continue iterating
    let mut tv: timeval = timeval {
        tv_sec: 1,
        tv_usec: 0,
    };

    // Initialize fd_sets (file descriptor sets)
    let mut read_fds: fd_set = unsafe { std::mem::zeroed() };
    let mut write_fds: fd_set = unsafe { std::mem::zeroed() };

    unsafe { FD_ZERO(&mut read_fds) };
    unsafe { FD_ZERO(&mut write_fds) };

Woah, woah, a lot of unsafe here! Well, don’t worry, it’s just how C FFI works. We need to initialize some C structures, a timeout and fd_sets. They will be passed to the select(2) function later.

    // Add read interests to read fd_sets
    for fd in self.read.borrow().keys() {
        unsafe { FD_SET(*fd, &mut read_fds as *mut fd_set) };
        nfds = std::cmp::max(nfds, fd + 1);
    }

    // Add write interests to write fd_sets
    for fd in self.write.borrow().keys() {
        unsafe { FD_SET(*fd, &mut write_fds as *mut fd_set) };
        nfds = std::cmp::max(nfds, fd + 1);
    }

Here we add file descriptors from our read and write maps from before to the fd_sets.

    // `select` will block until some event happens on the fds or the timeout triggers
    let rv = unsafe {
        select(
            nfds,
            &mut read_fds,
            &mut write_fds,
            std::ptr::null_mut(),
            &mut tv,
        )
    };

    // Don't care for errors
    if rv == -1 {
	    panic!("select()");
    } else if rv == 0 {
	    debug!("timeout");
    } else {
	    debug!("data available on {} fds", rv);
    }

Finally we call select with arguments we prepared earlier. select() accepts 3 fd_sets (we ignore the third for this example) and a timeout and returns something non-zero in case one (or more) of the file descriptors in the sets are ready. We should then go and find which one it was!

    // Check which fd it was and put appropriate future on run queue
    for (fd, waker) in self.read.borrow().iter() {
        let is_set = unsafe { FD_ISSET(*fd, &mut read_fds as *mut fd_set) };
        if is_set {
            waker.wake();
        }
    }

    // Same for write
    for (fd, waker) in self.write.borrow().iter() {
        let is_set = unsafe { FD_ISSET(*fd, &mut write_fds as *mut fd_set) };
        if is_set {
            waker.wake();
        }
    }

We go through our maps again and check if they’re set (ready) in fd_sets. When they are, we call wake on their associated wakers, which in turn puts Wakeup events on the ready-to-run queue.

    let mut tasks_done = Vec::new();

    // Now pop tasks from the run queue and poll them
    while let Some(wakeup) = self.run_queue.borrow_mut().pop_front() {
        let mut handle = self.handle();

        if let Some(ref mut task) = self.wait_queue.borrow_mut().get_mut(&wakeup.index) {
            // If a task returns `Poll::Ready`, we're done with it
            if task.poll(wakeup.waker, &mut handle).is_ready() {
                tasks_done.push(wakeup.index);
            }
        }
    }

    // Remove completed tasks
    for idx in tasks_done {
	    self.wait_queue.borrow_mut().remove(&idx);
    }

    // Stop the loop if there are no more tasks in the `wait_queue`
    if self.wait_queue.borrow().is_empty() {
	    return;
    }

We drain the run_queue, retrieve the task index on the wait_queue and poll the tasks. Ready (done) tasks are removed from the wait_queue.

The life of the future

In this section, I’d like to recap how a future (let’s take Read for example) is executed on the event loop:

  • first it’s created using read() method on AsyncTcpStream. This method is implemented for all types that implement AsyncRead trait.
  • then it’s spawned on the executor, using either run() or spawn() method
  • the executor calls poll() method on the future. poll implementation for Read calls poll_read() on the AsyncTcpStream, which registers its interest in readable events
  • when an event occurs, future is polled again. This cycle is repeated until future is ready.

Thank you

Thanks to all futures team, and specially @aturon for encouragement and @MajorBreakfast for edits.

That’s all for today! You can find fahrenheit on github and crates.io. Happy hacking!