Tracking Asynchronous IO Using Type Systems

Some time ago I gave a short presentation to some colleagues of mine about the
Python gevent library, and the low-level libraries it uses to perform its job
(which mainly boils down to handle asynchronous IO and managing the microthreads
waiting for these asynchronous actions to complete, using libev or libevent as
a wrapper around select/poll/epoll and the greenlet hack to support lightweight
threads in Python).

The gevent library contains a module which monkey-patches several modules in the
Python standard library to change their synchronous nature into an asynchronous
implementation running on top of the gevent mainloop, including socket and
thread. This is required to work around one of the major pitfalls whenever
trying to use async IO in a runtime like Python: any existing code/library/…
which performs (standard) blocking calls somehow, will block the
(single-threaded, from an OS perspective) mainloop, and as such inhibit
execution of any runnable microthreads at the time (this is something where e.g.
node.js has an edge over Python when developing highly concurrent servers. Do
not fear, I won’t get into the topic of whether or not it’s a good idea to write
highly concurrent and scalable services in Python or Javascript in this post).

Next to providing a mechanism to handle concurrent IO, the use of greenlets to
manage threads of execution also introduces another useful property of gevent:
instead of threads backed by OS threads, executed in parallel on an SMP system,
using preemptive multitasking, the lightweight threads are scheduled in
userspace by the library itself, in a cooperative manner: the places at which a
thread of execution yields execution (also known as ‘switch’ in gevent) is
explicity defined inside the code (whether it’s yours or in some library). As
such one can make assumptions (or rather, assertions) about some code using
some mutable data shared across execution threads which can be thread-safe in
the cooperative settings, whilst it would be unsafe in a preempted scenario.

The latter raised an interesting question from a colleague: is there a way to
assert some code will not yield execution, i.e. some section will always be
executed as an atomic block? This is, obviously, an important consideration
when relying on this property when writing code without locks which would
become incorrect if the tread could be switched out!

I answered (maybe to their surprise) this is certainly possible and standard
practice in some languages, yet as far as I know not in Python (or, at least,
not very elegant). I didn’t get into this any further at the time, yet here’s a
post in which we will devise such tracking system.

It’s based on several concepts from the world of Functional Programming, yet
don’t fear: you don’t need any knowledge about any FP concepts at all. Read on
and enjoy the ride!

Asynchronous Programming

As you most likely heard, there are 2 ways data can be provided to something
making a request: synchronously, or asynchronously. In the prior system, the
requesting entity will wait (block) until it receives a response, then
continue execution. When using the asynchronous pattern, some code will issue a
request, then tell the system what to do when the request completed and a
result is available, and then continues working on something else, or yielding
execution to some other execution thread (please make sure to make a strong
distinction between thread of execution and system thread: there might be
some relation between these in some programming patterns, yet conceptually
they’re different beasts!).

Note there’s a major difference between execution mechanisms (sync vs. async)
and APIs: whilst some APIs explicitly expose their asynchronous execution
nature by using callbacks (think node.js, Twisted‘s Deferred type, which is a
sort of callback-on-steroids,…), others might look as if every call is a
blocking procedure call, yet internally everything is dispatched using some
asynchronous mechanism (think CPS, ‘Future‘-style APIs which get compiled
into something using CPS, or (you guessed it) systems like gevent or the IO
manager and microthreads as found in GHC‘s Haskell runtime).

The differences (or should I say, similarities?) of these styles deserve a
complete post on their own though!

Enter Types

Whenever we want to track something at a code level (which is basically, at
compile time), all we can use is what is known at this compile time: types. We
can’t rely on any values (since there are no actual values available! Go
figure: since much use of asynchronous programming is found when handling IO,
it’d be funny to know what we’ll receive on a socket at runtime while compiling
the program)!

We invent a name for the type which will be used to tag all values which we
received thanks to some sort of asynchronous call (which might introduces a
yield point): we’ll call it async. Now this is rather useless: async
what?

Those of you who toyed with generics before in Java, C#, Scala or anything
alike might think the answer is rather trivial: we create a generic type to
tag the actual type of the value we retrieved using an async call! These
generic types are also known as higher kinded type in FP, yet you shouldn’t
care too much about this for the rest of this discourse.

So, we now got our type, written as ‘a async (using OCaml notation). This is
similar to Async<A> in Java and C# (as far as I remember). For those new to
generics: think of a list of items. One could have a list of strings, one of
ints, myobjects or anything else. The type of these lists will then be string
list
, int list and myobject list. We can write functions which can act on
any kind of lists (e.g. check whether the list is empty: it doesn’t matter
what type of values the list contains!). As such the list type is defined as a
more generic type which takes a type parameter: ‘a list. It’s
parametrised over type ‘a.

So, char async is a char we obtained using using some async action, and
a value of type string async is a string we got through some async call (it’s
an action which will yield a string value). Do note string async and string
are completely different types, as such it’s not (and should not be!) possible
to pass a value of type string async to a function which takes a string
value as its input. You could consider a value of type string to be a real
string value, a sequence of characters, whilst a value of type string async
is the representation of "a string value we’ll retrieve using some async call",
which is not the value itself.

Note this type comes pretty close to a Future, which is also often
parametrised (a generic class) Future<A>, which signifies "a box which will
eventually contain a value of type A".

Primitives

For our system to work, we’ll need a couple of primitive, magic operations,
provided by the system, which are the basic building blocks of everything
above. These are the operations which implement the most low-level async
procedures which can only be provided by the runtime system. Everything else
can be constructed on top of these by combining functions and actions and as
such building our applications, composing entities into larger entities.

We will only introduce 3 of these primitives, which should be sufficient for
the mechanism outlined in this post to be clear. In a real, production library
or runtime system one would need a couple of more of these primitive actions to
be
able to do something interesting, of course!

Of these 3 actions, 2 work on file descriptors: one writes a given string to a
given file descriptor and doesn’t return any result (we assume the data will
always be written at once, and this will never fail, unlike the write(2)
system call, obviously), the other reads a string of requested length from a
given file descriptor (the same principle with regard to success and failure
applies).

The third action allows a thread to sleep for a given amount of time (say,
seconds, although that obviously doesn’t make any difference). The result of
this action contains the number of seconds the thread actually did sleep.

Here are the prototypes. Note unit is a type with only a single value, (),
which is used when some other languages use void (it’s not exactly the same
thing, but for now this comparison should hold):

write : file_descr -> string -> unit async
read : file_descr -> int -> string async
sleep : int -> int async

Keeping the Genie in the Bottle

Ok, first steps are done: we got a type to wrap values which were obtained
using an async action, and we got a couple of actions defined. Next step: doing
something useful with these operations!

Let’s start with a very simple exercise: writing an action which reads a single
character from standard input (a value named stdin of type file_descr) and
writes it to standard output (a value named stdout of type file_descr).

First, we’ll create 2 utility functions to handle standard input and output, by
using partial application (if you don’t know what this is, consider the
following example: given the function mul :: int -> int -> int which
multiplies 2 integers, we can use partial application and call mul with a
single argument, e.g. 10, which results in a function of type int -> int.
Whenever passing an argument to this function, the result will be 10 times this
argument):

let read_stdin = read stdin
let write_stdout = write stdout

The types:

read_stdin : int -> string async
write_stdout : string -> unit async

Now we could try to write our program:

let attempt () =
    let char = read_stdin 1 in
    let _ = write_stdout char in
    ()

Failure all over! First of all, the code will not type-check:

Error: This expression has type string async
       but an expression was expected of type string

referring to the char argument on the line calling write_stdout.

Here’s why: write_stdout wants its first argument to be of type string
(this is the type of the second argument of write, as you know), but the
value we provide, named char, is not of type string: its type is
string async, the return type of the read_stdin (or further down read)
action!

Next to that, if the code would type-check, our initial goal would have
failed: the type of attempt would be unit -> unit, which doesn’t signify
we used some of the async primitives at all! Our action must return something
of type unit async, and there should be no way to write an action whose
return type is not ‘a async for some type ‘a!

Back to the drawing board… It looks like standard assignment and passing
values around as-is won’t work (remember I stressed it’s important to make the
distinction between a string value and something representing some string
retrieved using some async action of type string async?).

We dig into our FP toolkit once more, and grab another hammer (after the type
system we used earlier): function composition! Or, in this case, action
composition.

What we want is a function we can use to link two actions together into a
new action!

Let’s try to figure out some type signature for such function:

link : 'a async -> 'b async -> 'b async

link takes an initial action which yields something of type ‘a, a second
action which yields a ‘b, and combines these into an action which also yields
a ‘b async action.

But wait, this can’t be right either! In this setup, the second action still
has no way to retrieve and actually use the ‘a value as encapsulated by the
first action!

We need to extend our link function to unwrap whatever the first given action
yields, then pass it to the second argument as a proper value it can use,
whilst still making sure the fact there’s something asynchronous going on is
retained.

Here’s what the type of our link function should look like:

link : 'a async -> ('a -> 'b async) -> 'b async

The second argument now became a function (cool, uh, functions taking functions
as arguments? These are also called "higher-order functions" (remember
"higher-kinded types"?) and are very powerful) which takes a value of type
‘a, and results in an action of type ‘b async, which is then used as the
result of the link function.

Note how you can see, only by looking at the type signature, the actual ‘a
value can never be leaked out of some other ‘b async action, since that’s
what the second argument must return, and only this function ever gets access
to the plain ‘a value?

Do note we will not discuss how link is implemented, since this is
unrelated to this post, and depends on how the whole library is designed and
implemented!

Let’s get back to our single-character echo action: using link, we need an
initial action of type ‘a async. We got that: read_stdin 1 has type
string async. Ok, next we need a function of type ‘a -> ‘b async. We know
what ‘a is now, since we already decided to use read_stdin 1 as first
argument, so ‘a is string. We’re looking for a function of type
string -> ‘b async which writes the given string to the screen. This is easy:
write_stdout has exactly this type, using unit for ‘b!

Here goes:

let echo_one = link (read_stdin 1) write_stdout

The type of echo_one is unit async, like we want it to be!

From now on though, we won’t use the name link anymore: this is just
something I made up. A more common name for such function is bind, which
we’ll use from now on. Next to that, there’s an infix operator (an infix
operator is a function which takes 2 arguments and is placed in-between
these arguments instead of before them, like the + in 1 + 2) called >>=.
This allows us to rewrite our echo_one action like this:

let echo_one' = read_stdin 1 >>= write_stdout

Let’s make things more interesting: writing an action which reads 10 characters
from standard input, then sleeps maximum 2 seconds, then writes these
characters to some given file descriptor:

let sleepy_writer out =
    read_stdin 10 >>= fun data ->
    sleep 2 >>= fun _ ->
    write out data

You might notice the use of indentation and anonymous functions, and we ignore
the result of the sleep action (we use _ as its binding name), but the
code should be easy to understand.

If this isn’t clear, here’s how to read the above snippet: we define a function
called sleepy_writer which takes a single argument called out. Upon
invocation, the function will result in 10 chars to be read from stdin.
read_stdin 10 is an action which, upon completion, will yield a string. We
bind a function which takes this string value (which we bind to the name
data in an anonymous function) and returns another action: everything starting
with sleep up until the very end of the function body. So, once read_stdin
10
has completed, we continue with this next action, which will make the
current thread of execution sleep for 2 seconds. Once again, we bind this to a
function which takes the value which the sleep 2 action yields and ignores
this (by binding it to the name _), then results in one last action which will
be executed as well, and will write the data value (which is at that moment in
scope!) to the given out file descriptor. The result of the sleepy_writer
action will be the result of the write action.

Try to figure out the type of sleepy_writer. Got it?

sleepy_writer : file_descr -> unit async

Notice we didn’t get rid of the async marker!

Finally, an action which keeps reading chunks of data of given chunk size from
a given file descriptor, then writes it to another given file descriptor:

let rec copy_stream chunk_size in_fd out_fd =
    read in_fd chunk_size >>= fun data ->
    write out_fd data >>= fun () ->
    copy_stream chunk_size in_fd out_fd

Even though copy_stream is infinitely-recursive, its type can be calculated:

copy_stream : int -> file_descr -> file_descr -> 'a async

Yet again, the async marker sticks.

Do note, in real-world code, one should not use a top-level rec definition
but define some loop action in the body etc.

Return, But Not As You Know It

One last step in our journey is required. Up until now all actions we created
by combining other actions using bind had the result value of the last
action in this chain as their result, whilst in some actions we want to
calculate such result and return value. Due to the constraints we wanted to
achieve (and as imposed by the only function we can use to actually use
actions, bind) we can’t just use plain values, they need to be wrapped in the
async container as well. So here’s what we need: something which turns an
‘a into an ‘a async:

whatever_it_is_called : 'a -> 'a async

For some reasons, this function is mostly called return. Don’t be mistaken,
this is completely unrelated to the return statement as found in
C/Java/C#/Python/PHP/… and is no related to end the execution of a procedure
and signal some result value or anything alike. It’s a normal function to put
a value in an async box, even though this value itself was not retrieved
using some async action as-is:

return : 'a -> 'a async

Given this, we can write some more interesting actions. As a first example,
let’s write an action which reads a single line from a given file descriptor by
reading characters one-by-one until it finds an ‘n’ character, then yields
the line it read (without the newline):

let read_line fd =
    let rec loop acc =
        read fd 1 >>= fun char ->
        if char = "\n"
        then return acc
        else loop (acc ^ char)
    in
    loop ""

If you’re not versed into FP yet, this might be somewhat hard to read and
understand at first. Take a second look and follow the execution flow manually,
it’ll become clear. It might be useful to know the ^ operator is used to
concatenate strings:

(^) : string -> string -> string

Did you try to figure out the type of read_line? It’s as intended:

read_line : file_descr -> string async

One more example: since the sleep action might return even before the
requested number of seconds has passed (don’t ask me why, I just made that up),
we want to write an action which sleeps at least the given number of seconds,
and as little more as possible (otherwise we could sleep eternally. Nice
try!). We don’t care how long we slept in the end (which is a rather stupid
requirement: a serious API would return this value, and a caller is free to
ignore this).

Here we go:

let sleep_min sec =
    let rec loop left =
        sleep left >>= fun slept ->
        if left < slept
        then return ()
        else loop (left - slept)
    in
    loop sec

The type of sleep_min? int -> unit async.

Conclusion

Here we are! Using the ‘a async type, bind and return, we have a system
which allows us to combine and use asynchronous actions, whilst being sure we
can never forget something async is going on under the hood, no matter how
complex the actions themselves become. If we don’t see something of the
‘a async type, we can be certain nothing is using the async primitives
underneath.

Notice how we were able to implement something which gives what we wanted from
the beginning, without any specific support in the language we’re using:
only fairly standard type system requirements, and functions, as available in
several languages, including OCaml, Haskell and several others (although in
large languages without first-class functions etc. syntax might become an
issue, thinking of Java and alike).

Thanks to the use of types, the compiler can be very helpful during development
to reduce the number of potential runtime issues. Even though a system like the
above can be implemented in dynamic-typed languages like Python or Ruby, having
compile-time type checking offers a lot of safety!

Note this has been a very basic introduction, so now comes…

Going From Here

Once you reached this point, you might want to get to know more about the
mechanics behind the system outlined above. As some might have heard, bind
and return are often used in the presence of monads, and indeed, one might
think our ‘a async type is monadic (it might be, but not necessarily: the
monad laws won’t be fulfilled in the presence of real IO). Overall monads
provide a way to track "things with a tag which should always keep this tag".
The above is a very informal definition and introduction, but the interested
reader might refer to one of the many monad tutorials available on the internet
(all of varying quality and usefulness).

Next to this, reading up on functors (not the OCaml kind, but things with the
function fmap :: (a -> b) -> ‘a f -> ‘b f) could be useful as well (our
‘a async type is a functor, like any other monad:
let fmap f a = a >>= fun v -> return (f v)).

Some links: