Infrastructure
Now, for everything to work we need some helpers to make our infrastructure work.
First of all, we need a way to get the id
of an available thread.
fn get_available_thread(&mut self) -> usize {
match self.available_threads.pop() {
Some(thread_id) => thread_id,
// We would normally return None and not panic!
None => panic!("Out of threads."),
}
}
As you see, we take one huge shortcut here. If we run out of threads, we panic!
.
This is not good, and we should rather implement logic to queue these requests
and run them as soon as a thread is available. However, our code is already getting
long, and it's not very important for our goal of learning about async
.
Maybe this implementing such a queue is a good reader-exercise? Feel free to fork the repository and go ahead :)
The next thing we need to do is to create an unique identity for our callbacks.
#![allow(unused)] fn main() { /// If we hit max we just wrap around fn generate_identity(&mut self) -> usize { self.identity_token = self.identity_token.wrapping_add(1); self.identity_token } fn generate_cb_identity(&mut self) -> usize { let ident = self.generate_identity(); let taken = self.callback_queue.contains_key(&ident); // if there is a collision or the identity is already there, we loop until we // find a new one. We don't cover the case where there are `usize::max_value()` // callbacks waiting, since if we're fast and queue a new event // every nanosecond, that would still take 585 years on a 64 bit system. if !taken { ident } else { loop { let possible_ident = self.generate_identity(); if self.callback_queue.contains_key(&possible_ident) { break possible_ident; } } } } }
The function generate_cb_identity
is where it all happens, genereate_identity
is just
a small function so we try to avoid the long functions we had in the introduction.
Now, there are some important considerations to be aware of. Even though we use several threads, we use a regular
usize
here and the reason for that is that it's only one thread that will be generating Id's. This could cause problems if several threads tried toread
andgenerate
new Id's at the same time.
We use the wrapping_add
method on usize
to get the next Id, this means that
when we reach 18446744073709551615
we wrap around to 0 again.
We do check of our callback_queue contains our key (even though that is unlikely by design), and if it's taken we just generate a new one until we find a available one.
Next up is the method we use to add a callback to our callback_queue
:
#![allow(unused)] fn main() { /// Adds a callback to the queue and returns the key fn add_callback(&mut self, ident: usize, cb: impl FnOnce(Js) + 'static) { let boxed_cb = Box::new(cb); self.callback_queue.insert(ident, boxed_cb); } }
If you haven't seen the signature cb: impl FnOnce(Js) + 'static
before I'll
explain it briefly here.
The impl ...
means that we accept an arguments that implements the trait FnOnce(Js)
with a 'static
lifetime. FnOnce is
a trait implemented by closures
. There are three main traits a closure
can implement
in Rust and FnOnce
is the one you'll use if you plan on consume an instance from
the environment.
Since you consume the variable a closure
implementing FnOnce
can only be called
once. Our closure will take ownership over resources we create in our main
thread
and consume it. We want this since once consumed, the resources we used will be cleaned
up as a result of Rusts RAII
pattern. It's implicit that FnOnce
returns ()
in this
case so we don't have to write FnOnce(Js) -> ()
.
Since callbacks are meant to only be called once, this is a perfectly fine bound for us to use here.
Now, traits doesn't have a size so for the compiler to be able to allocate space
for it on the stack we either need to take a reference &FnOnce(Js)
or place it
on the heap using Box
. We do the latter since that's the only thing that makes
sense for our use case. Box is a pointer to a heap allocated variable which we do
know the size of so we store that reference in our callback_queue
HashMap.
What makes a closure? A function in rust can be defined as easily as
|| { }
. If this is all we write it's the same as a function pointer, equivalent to just referencingmy_method
(without parenthesis). It becomes aclosure
as soon as you "close" over your environment by referencing variables that's not owned by thefunction
.
Fn
traits are automatically implemented, and whether it implementsFn
,FnMut
orFnOnce
depend whether you take ownership over a non-copy variable, take a shared reference&
or an exclusive reference&mut
(often called a mutable reference).
Now that we got some closure basics out of the way we can move on. The next method
is how we register I/O
work. This is how we register an epoll
event with our runtime:
#![allow(unused)] fn main() { pub fn register_event_epoll(&mut self, token: usize, cb: impl FnOnce(Js) + 'static) { self.add_callback(token, cb); print(format!("Event with id: {} registered.", token)); self.pending_events += 1; self.epoll_pending_events += 1; } }
The first thing we do is to add the callback to our callback_queue
, calling the
method we explained previously. Next we do a print statement, just since we want
to print out the flow of our program we need to add this at strategic places.
One important thing to note here. Our
token
in this case is already guaranteed to be unique. We generate it in theHttp
module (which is the only one registering events by using this method in our example). The reason for this will become clear in a few short chapters. Just note that we don't need to callgenerate_cb_identity
here.
We increase the counters on both pending_events
and epoll_pending_events
.
Our next method registers work for the thread pool
#![allow(unused)] fn main() { pub fn register_event_threadpool( &mut self, task: impl Fn() -> Js + Send + 'static, kind: ThreadPoolTaskKind, cb: impl FnOnce(Js) + 'static, ) { let callback_id = self.generate_cb_identity(); self.add_callback(callback_id, cb); let event = Task { task: Box::new(task), callback_id, kind, }; // we are not going to implement a real scheduler here, just a LIFO queue let available = self.get_available_thread(); self.thread_pool[available].sender.send(event).expect("register work"); self.pending_events += 1; } }
Let's first have a look at the arguments to this function (aside from &mut self
).
task: impl Fn() -> Js + Send + 'static
is a task we want to run on a separate
thread. This closure has the bond: Fn() -> Js + Send + 'static
which means
it's a closure
that takes no arguments, but returns a type of Js
. It needs to
be Send
since we're sending this task to another thread.
kind: ThreadPoolTaskKind
lets us know what kind of task this. We do this for
two reasons:
- We need to be able to signal a
Close
event to our threads - We want to be able to print the kind of task each event received.
As you understand, we don't have to create a Kind
for every task, but since we
want to print out what the thread received we need some way of judging what kind
of task each thread received.
The last argument cb: impl FnOnce(Js) + 'static
is our callback. It's not a coincidence
that our task
returns a type of Js
and our callback takes a Js
as an argument. The
result of the work we do in our thread is the input to our callback. This closure doesn't
need to be Send
since we don't pass the callback itself to the thread pool.
Next we generate a new identity with self.generate_cb_identity()
and we add the
callback to our callback queue.
Then we construct a new Event
, and as I have shown earlier, we need to Box
the
closure.
Now, the last part could be made arbitrarily complex. This is where you decide how
you want to schedule your work to the thread pool. In our case we just get an
available thread (and panic!
if we're out of thread - ouch), and we send our task
to the thread which then runs it until it's finished.
You could make priorities based on TaskKind
, you could try to decide which tasks
are short and which are long and prioritize them based on load. A lot of exciting
things could be done here. We will choose the simplest possible one though, and just
push them directly to a thread in the order they come.
The last part of the "infrastructure" is a function to set a timeout.
#![allow(unused)] fn main() { fn set_timeout(&mut self, ms: u64, cb: impl Fn(Js) + 'static) { // Is it theoretically possible to get two equal instants? If so we'll have a bug... let now = Instant::now(); let cb_id = self.generate_cb_identity(); self.add_callback(cb_id, cb); let timeout = now + Duration::from_millis(ms); self.timers.insert(timeout, cb_id); self.pending_events += 1; print(format!("Registered timer event id: {}", cb_id)); } }
Set timeout uses std::time::Instant
to get a representation of "now". It's the first
thing we do since the user expects the timeout to be calculated from "now", and some
of our operations here might take a little time.
We generate an identity for the callback cb
passed in to set_timeout
and add
that callback to our callback queue.
We add the duration
in milliseconds to our Instant
so we know at what time
our timeout times out.
We insert the callback_id
instant to our BtreeMap
with the calculated Instant
as
the key.
We increase the counter for pending_events
and print out a message for us to
be able to follow the flow of our program.
This might be a good time to talk briefly about our choice of a
BTreeMap
as the collection we store timers in.From the documentation we can read "In theory, a binary search tree (BST) is the optimal choice for a sorted map, as a perfectly balanced BST performs the theoretical minimum amount of comparisons necessary to find an element (log2n)." Now, this isn't a Binary Tree but a BTree. While a BST allocates one node for each value, a BTree allocates a small
Vec
of values for each node. Modern computers reads much more data than we normally ask for into its caches, and thats one reason they love contiguous parts of memory. A BTree will result in a more optimal "cache efficiency" which often trumps the gains of the theoretically more optimal algorithm in a true BST.Lastly, since we're talking about searching sorted collections here, and timeouts, is a perfect example of such, we'll of course use this when it's so readily available to us in Rusts standard library.