Tracking Asynchronous IO Using Type Systems

Some time ago I gave a short presentation to some colleagues of mine about the
Python gevent library, and the low-level libraries it uses to perform its job
(which mainly boils down to handle asynchronous IO and managing the microthreads
waiting for these asynchronous actions to complete, using libev or libevent as
a wrapper around select/poll/epoll and the greenlet hack to support lightweight
threads in Python).

The gevent library contains a module which monkey-patches several modules in the
Python standard library to change their synchronous nature into an asynchronous
implementation running on top of the gevent mainloop, including socket and
thread. This is required to work around one of the major pitfalls whenever
trying to use async IO in a runtime like Python: any existing code/library/…
which performs (standard) blocking calls somehow, will block the
(single-threaded, from an OS perspective) mainloop, and as such inhibit
execution of any runnable microthreads at the time (this is something where e.g.
node.js has an edge over Python when developing highly concurrent servers. Do
not fear, I won’t get into the topic of whether or not it’s a good idea to write
highly concurrent and scalable services in Python or Javascript in this post).

Next to providing a mechanism to handle concurrent IO, the use of greenlets to
manage threads of execution also introduces another useful property of gevent:
instead of threads backed by OS threads, executed in parallel on an SMP system,
using preemptive multitasking, the lightweight threads are scheduled in
userspace by the library itself, in a cooperative manner: the places at which a
thread of execution yields execution (also known as ‘switch’ in gevent) is
explicity defined inside the code (whether it’s yours or in some library). As
such one can make assumptions (or rather, assertions) about some code using
some mutable data shared across execution threads which can be thread-safe in
the cooperative settings, whilst it would be unsafe in a preempted scenario.

The latter raised an interesting question from a colleague: is there a way to
assert some code will not yield execution, i.e. some section will always be
executed as an atomic block? This is, obviously, an important consideration
when relying on this property when writing code without locks which would
become incorrect if the tread could be switched out!

I answered (maybe to their surprise) this is certainly possible and standard
practice in some languages, yet as far as I know not in Python (or, at least,
not very elegant). I didn’t get into this any further at the time, yet here’s a
post in which we will devise such tracking system.

It’s based on several concepts from the world of Functional Programming, yet
don’t fear: you don’t need any knowledge about any FP concepts at all. Read on
and enjoy the ride!

Asynchronous Programming

As you most likely heard, there are 2 ways data can be provided to something
making a request: synchronously, or asynchronously. In the prior system, the
requesting entity will wait (block) until it receives a response, then
continue execution. When using the asynchronous pattern, some code will issue a
request, then tell the system what to do when the request completed and a
result is available, and then continues working on something else, or yielding
execution to some other execution thread (please make sure to make a strong
distinction between thread of execution and system thread: there might be
some relation between these in some programming patterns, yet conceptually
they’re different beasts!).

Note there’s a major difference between execution mechanisms (sync vs. async)
and APIs: whilst some APIs explicitly expose their asynchronous execution
nature by using callbacks (think node.js, Twisted‘s Deferred type, which is a
sort of callback-on-steroids,…), others might look as if every call is a
blocking procedure call, yet internally everything is dispatched using some
asynchronous mechanism (think CPS, ‘Future‘-style APIs which get compiled
into something using CPS, or (you guessed it) systems like gevent or the IO
manager and microthreads as found in GHC‘s Haskell runtime).

The differences (or should I say, similarities?) of these styles deserve a
complete post on their own though!

Enter Types

Whenever we want to track something at a code level (which is basically, at
compile time), all we can use is what is known at this compile time: types. We
can’t rely on any values (since there are no actual values available! Go
figure: since much use of asynchronous programming is found when handling IO,
it’d be funny to know what we’ll receive on a socket at runtime while compiling
the program)!

We invent a name for the type which will be used to tag all values which we
received thanks to some sort of asynchronous call (which might introduces a
yield point): we’ll call it async. Now this is rather useless: async
what?

Those of you who toyed with generics before in Java, C#, Scala or anything
alike might think the answer is rather trivial: we create a generic type to
tag the actual type of the value we retrieved using an async call! These
generic types are also known as higher kinded type in FP, yet you shouldn’t
care too much about this for the rest of this discourse.

So, we now got our type, written as ‘a async (using OCaml notation). This is
similar to Async<A> in Java and C# (as far as I remember). For those new to
generics: think of a list of items. One could have a list of strings, one of
ints, myobjects or anything else. The type of these lists will then be string
list
, int list and myobject list. We can write functions which can act on
any kind of lists (e.g. check whether the list is empty: it doesn’t matter
what type of values the list contains!). As such the list type is defined as a
more generic type which takes a type parameter: ‘a list. It’s
parametrised over type ‘a.

So, char async is a char we obtained using using some async action, and
a value of type string async is a string we got through some async call (it’s
an action which will yield a string value). Do note string async and string
are completely different types, as such it’s not (and should not be!) possible
to pass a value of type string async to a function which takes a string
value as its input. You could consider a value of type string to be a real
string value, a sequence of characters, whilst a value of type string async
is the representation of "a string value we’ll retrieve using some async call",
which is not the value itself.

Note this type comes pretty close to a Future, which is also often
parametrised (a generic class) Future<A>, which signifies "a box which will
eventually contain a value of type A".

Primitives

For our system to work, we’ll need a couple of primitive, magic operations,
provided by the system, which are the basic building blocks of everything
above. These are the operations which implement the most low-level async
procedures which can only be provided by the runtime system. Everything else
can be constructed on top of these by combining functions and actions and as
such building our applications, composing entities into larger entities.

We will only introduce 3 of these primitives, which should be sufficient for
the mechanism outlined in this post to be clear. In a real, production library
or runtime system one would need a couple of more of these primitive actions to
be
able to do something interesting, of course!

Of these 3 actions, 2 work on file descriptors: one writes a given string to a
given file descriptor and doesn’t return any result (we assume the data will
always be written at once, and this will never fail, unlike the write(2)
system call, obviously), the other reads a string of requested length from a
given file descriptor (the same principle with regard to success and failure
applies).

The third action allows a thread to sleep for a given amount of time (say,
seconds, although that obviously doesn’t make any difference). The result of
this action contains the number of seconds the thread actually did sleep.

Here are the prototypes. Note unit is a type with only a single value, (),
which is used when some other languages use void (it’s not exactly the same
thing, but for now this comparison should hold):

write : file_descr -> string -> unit async
read : file_descr -> int -> string async
sleep : int -> int async

Keeping the Genie in the Bottle

Ok, first steps are done: we got a type to wrap values which were obtained
using an async action, and we got a couple of actions defined. Next step: doing
something useful with these operations!

Let’s start with a very simple exercise: writing an action which reads a single
character from standard input (a value named stdin of type file_descr) and
writes it to standard output (a value named stdout of type file_descr).

First, we’ll create 2 utility functions to handle standard input and output, by
using partial application (if you don’t know what this is, consider the
following example: given the function mul :: int -> int -> int which
multiplies 2 integers, we can use partial application and call mul with a
single argument, e.g. 10, which results in a function of type int -> int.
Whenever passing an argument to this function, the result will be 10 times this
argument):

let read_stdin = read stdin
let write_stdout = write stdout

The types:

read_stdin : int -> string async
write_stdout : string -> unit async

Now we could try to write our program:

let attempt () =
    let char = read_stdin 1 in
    let _ = write_stdout char in
    ()

Failure all over! First of all, the code will not type-check:

Error: This expression has type string async
       but an expression was expected of type string

referring to the char argument on the line calling write_stdout.

Here’s why: write_stdout wants its first argument to be of type string
(this is the type of the second argument of write, as you know), but the
value we provide, named char, is not of type string: its type is
string async, the return type of the read_stdin (or further down read)
action!

Next to that, if the code would type-check, our initial goal would have
failed: the type of attempt would be unit -> unit, which doesn’t signify
we used some of the async primitives at all! Our action must return something
of type unit async, and there should be no way to write an action whose
return type is not ‘a async for some type ‘a!

Back to the drawing board… It looks like standard assignment and passing
values around as-is won’t work (remember I stressed it’s important to make the
distinction between a string value and something representing some string
retrieved using some async action of type string async?).

We dig into our FP toolkit once more, and grab another hammer (after the type
system we used earlier): function composition! Or, in this case, action
composition.

What we want is a function we can use to link two actions together into a
new action!

Let’s try to figure out some type signature for such function:

link : 'a async -> 'b async -> 'b async

link takes an initial action which yields something of type ‘a, a second
action which yields a ‘b, and combines these into an action which also yields
a ‘b async action.

But wait, this can’t be right either! In this setup, the second action still
has no way to retrieve and actually use the ‘a value as encapsulated by the
first action!

We need to extend our link function to unwrap whatever the first given action
yields, then pass it to the second argument as a proper value it can use,
whilst still making sure the fact there’s something asynchronous going on is
retained.

Here’s what the type of our link function should look like:

link : 'a async -> ('a -> 'b async) -> 'b async

The second argument now became a function (cool, uh, functions taking functions
as arguments? These are also called "higher-order functions" (remember
"higher-kinded types"?) and are very powerful) which takes a value of type
‘a, and results in an action of type ‘b async, which is then used as the
result of the link function.

Note how you can see, only by looking at the type signature, the actual ‘a
value can never be leaked out of some other ‘b async action, since that’s
what the second argument must return, and only this function ever gets access
to the plain ‘a value?

Do note we will not discuss how link is implemented, since this is
unrelated to this post, and depends on how the whole library is designed and
implemented!

Let’s get back to our single-character echo action: using link, we need an
initial action of type ‘a async. We got that: read_stdin 1 has type
string async. Ok, next we need a function of type ‘a -> ‘b async. We know
what ‘a is now, since we already decided to use read_stdin 1 as first
argument, so ‘a is string. We’re looking for a function of type
string -> ‘b async which writes the given string to the screen. This is easy:
write_stdout has exactly this type, using unit for ‘b!

Here goes:

let echo_one = link (read_stdin 1) write_stdout

The type of echo_one is unit async, like we want it to be!

From now on though, we won’t use the name link anymore: this is just
something I made up. A more common name for such function is bind, which
we’ll use from now on. Next to that, there’s an infix operator (an infix
operator is a function which takes 2 arguments and is placed in-between
these arguments instead of before them, like the + in 1 + 2) called >>=.
This allows us to rewrite our echo_one action like this:

let echo_one' = read_stdin 1 >>= write_stdout

Let’s make things more interesting: writing an action which reads 10 characters
from standard input, then sleeps maximum 2 seconds, then writes these
characters to some given file descriptor:

let sleepy_writer out =
    read_stdin 10 >>= fun data ->
    sleep 2 >>= fun _ ->
    write out data

You might notice the use of indentation and anonymous functions, and we ignore
the result of the sleep action (we use _ as its binding name), but the
code should be easy to understand.

If this isn’t clear, here’s how to read the above snippet: we define a function
called sleepy_writer which takes a single argument called out. Upon
invocation, the function will result in 10 chars to be read from stdin.
read_stdin 10 is an action which, upon completion, will yield a string. We
bind a function which takes this string value (which we bind to the name
data in an anonymous function) and returns another action: everything starting
with sleep up until the very end of the function body. So, once read_stdin
10
has completed, we continue with this next action, which will make the
current thread of execution sleep for 2 seconds. Once again, we bind this to a
function which takes the value which the sleep 2 action yields and ignores
this (by binding it to the name _), then results in one last action which will
be executed as well, and will write the data value (which is at that moment in
scope!) to the given out file descriptor. The result of the sleepy_writer
action will be the result of the write action.

Try to figure out the type of sleepy_writer. Got it?

sleepy_writer : file_descr -> unit async

Notice we didn’t get rid of the async marker!

Finally, an action which keeps reading chunks of data of given chunk size from
a given file descriptor, then writes it to another given file descriptor:

let rec copy_stream chunk_size in_fd out_fd =
    read in_fd chunk_size >>= fun data ->
    write out_fd data >>= fun () ->
    copy_stream chunk_size in_fd out_fd

Even though copy_stream is infinitely-recursive, its type can be calculated:

copy_stream : int -> file_descr -> file_descr -> 'a async

Yet again, the async marker sticks.

Do note, in real-world code, one should not use a top-level rec definition
but define some loop action in the body etc.

Return, But Not As You Know It

One last step in our journey is required. Up until now all actions we created
by combining other actions using bind had the result value of the last
action in this chain as their result, whilst in some actions we want to
calculate such result and return value. Due to the constraints we wanted to
achieve (and as imposed by the only function we can use to actually use
actions, bind) we can’t just use plain values, they need to be wrapped in the
async container as well. So here’s what we need: something which turns an
‘a into an ‘a async:

whatever_it_is_called : 'a -> 'a async

For some reasons, this function is mostly called return. Don’t be mistaken,
this is completely unrelated to the return statement as found in
C/Java/C#/Python/PHP/… and is no related to end the execution of a procedure
and signal some result value or anything alike. It’s a normal function to put
a value in an async box, even though this value itself was not retrieved
using some async action as-is:

return : 'a -> 'a async

Given this, we can write some more interesting actions. As a first example,
let’s write an action which reads a single line from a given file descriptor by
reading characters one-by-one until it finds an ‘n’ character, then yields
the line it read (without the newline):

let read_line fd =
    let rec loop acc =
        read fd 1 >>= fun char ->
        if char = "\n"
        then return acc
        else loop (acc ^ char)
    in
    loop ""

If you’re not versed into FP yet, this might be somewhat hard to read and
understand at first. Take a second look and follow the execution flow manually,
it’ll become clear. It might be useful to know the ^ operator is used to
concatenate strings:

(^) : string -> string -> string

Did you try to figure out the type of read_line? It’s as intended:

read_line : file_descr -> string async

One more example: since the sleep action might return even before the
requested number of seconds has passed (don’t ask me why, I just made that up),
we want to write an action which sleeps at least the given number of seconds,
and as little more as possible (otherwise we could sleep eternally. Nice
try!). We don’t care how long we slept in the end (which is a rather stupid
requirement: a serious API would return this value, and a caller is free to
ignore this).

Here we go:

let sleep_min sec =
    let rec loop left =
        sleep left >>= fun slept ->
        if left < slept
        then return ()
        else loop (left - slept)
    in
    loop sec

The type of sleep_min? int -> unit async.

Conclusion

Here we are! Using the ‘a async type, bind and return, we have a system
which allows us to combine and use asynchronous actions, whilst being sure we
can never forget something async is going on under the hood, no matter how
complex the actions themselves become. If we don’t see something of the
‘a async type, we can be certain nothing is using the async primitives
underneath.

Notice how we were able to implement something which gives what we wanted from
the beginning, without any specific support in the language we’re using:
only fairly standard type system requirements, and functions, as available in
several languages, including OCaml, Haskell and several others (although in
large languages without first-class functions etc. syntax might become an
issue, thinking of Java and alike).

Thanks to the use of types, the compiler can be very helpful during development
to reduce the number of potential runtime issues. Even though a system like the
above can be implemented in dynamic-typed languages like Python or Ruby, having
compile-time type checking offers a lot of safety!

Note this has been a very basic introduction, so now comes…

Going From Here

Once you reached this point, you might want to get to know more about the
mechanics behind the system outlined above. As some might have heard, bind
and return are often used in the presence of monads, and indeed, one might
think our ‘a async type is monadic (it might be, but not necessarily: the
monad laws won’t be fulfilled in the presence of real IO). Overall monads
provide a way to track "things with a tag which should always keep this tag".
The above is a very informal definition and introduction, but the interested
reader might refer to one of the many monad tutorials available on the internet
(all of varying quality and usefulness).

Next to this, reading up on functors (not the OCaml kind, but things with the
function fmap :: (a -> b) -> ‘a f -> ‘b f) could be useful as well (our
‘a async type is a functor, like any other monad:
let fmap f a = a >>= fun v -> return (f v)).

Some links:


Tracing block device write request sizes in Linux using SystemTap

Recently some people started to wonder how Arakoon, our distributed key-value store, handles the drive on which data is stored. To be more precise, this boils down to how Tokyo Cabinet (which we currently use as database implementation) submits write requests to the kernel, and how the kernel processes these later on, after translating the filesystem-level requests into write operations in the block layer.

This is considered important to know due to the usage of SSD drives (think wear leveling and other issues coming with these devices).

Tracing how the application writes data to a file can be achieved using the strace tool, looking for write(2) and pwrite(2) calls (although this is not necessarily sufficient: the application might be writing to a file using mmap(2) memory mappings).

On most loads, this won’t correspond to how block requests are sent to the actual hardware, though: writes can be buffered, reordered, etc. What we want is a way to gather information of write requests in the kernel block layer.

In Linux, one can use SystemTap (which is somewhat like DTrace on Solaris) to hook into the kernel at runtime and collect statistics, or lots of other information. One writes a script in which one hooks into trace points or function calls, then processes the incoming information.

I won’t go into the installation of SystemTap, since this will depend on the distribution you’re running. Do note you might not need root access to execute the scripts (so you shouldn’t either!): on my Fedora 16 system, all I had to do was putting my user in the stapsys, stapusr and stapdev groups (although I’m not sure I really need to be in all of those…). You’ll need debugging symbol files of your running kernel available, among other things (in my experience attempting to run a script on a system where some prerequisites are missing will result in the stap tool telling you what you need).

Writing SystemTap scripts sometimes requires some knowledge of kernel internals, to know where to hook in. Since I wanted to gather some statistics about block-layer calls, I knew I had to look for calls in the ‘bio’ subsystem, short for “Block IO”. Luckily SystemTap comes with some predefined combinations of hooks, called ‘tapsets’. These are stored (on my system at least) in /usr/share/systemtap/tapset. One of these files is called ioblock.stp, which sounds interesting. Since I was too lazy to look for documentation, I opened the file in a pager and read through it. The ioblock.request tap looked interesting, which provides access to a size value. After reading some example SystemTap scripts, here’s what I came up with:

global writes

probe ioblock.request {
    if(bio_rw_num(rw) == BIO_WRITE)
        writes[devname] <<< size
}

probe end {
    printf("\n")
    foreach([devname] in writes-) {
        printf("Device: %s\n", devname)
        println(@hist_log(writes[devname]))
    }
}

Running this using the stap tool, and closing it after a while (using CTRL-C) gives the following output:

$ stap -v blockio.stp
Pass 1: parsed user script and 92 library script(s) using 210196virt/30852res/3136shr kb, in 140usr/20sys/251real ms.
Pass 2: analyzed script: 3 probe(s), 21 function(s), 2 embed(s), 2 global(s) using 452960virt/226472res/92556shr kb, in 1500usr/100sys/2541real ms.
Pass 3: translated to C into "/tmp/stapeJldxD/stap_e40cc31c16b7dd290dabd43749c577a4_12507_src.c" using 452960virt/226600res/92684shr kb, in 10usr/0sys/12real ms.
Pass 4: compiled C into "stap_e40cc31c16b7dd290dabd43749c577a4_12507.ko" in 1780usr/350sys/2595real ms.
Pass 5: starting run.
^C
Device: sda5
value |-------------------------------------------------- count
    0 |@@@@@@@@@@@@@@@                                    30
    1 |                                                    0
    2 |                                                    0
      ~
  256 |                                                    0
  512 |                                                    0
 1024 |                                                    1
 2048 |                                                    0
 4096 |@@@@@@@@@@@@@@@@@@@@@@@@@@@                        54
 8192 |@@@                                                 7
16384 |@                                                   3
32768 |                                                    0
65536 |                                                    0

Device: dm-2
value |-------------------------------------------------- count
    0 |@@                                                  5
    1 |                                                    0
    2 |                                                    0
      ~
  256 |                                                    0
  512 |                                                    0
 1024 |@                                                   2
 2048 |                                                    0
 4096 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@                      58
 8192 |@@@@@@@                                            14
16384 |@@@                                                 6
32768 |                                                    0
65536 |                                                    0

Device: dm-4
value |-------------------------------------------------- count
    0 |@                                                   2
    1 |                                                    0
    2 |                                                    0
      ~
  256 |                                                    0
  512 |                                                    0
 1024 |@                                                   2
 2048 |                                                    0
 4096 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@                      58
 8192 |@@@@@@@                                            14
16384 |@@@                                                 6
32768 |                                                    0
65536 |                                                    0

Device: dm-1
value |-------------------------------------------------- count
 1024 |                                                    0
 2048 |                                                    0
 4096 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ 50
 8192 |                                                    0
16384 |                                                    0

Device: sda
value |-------------------------------------------------- count
 1024 |                                                   0
 2048 |                                                   0
 4096 |@@                                                 2
 8192 |                                                   0
16384 |                                                   0

Pass 5: run completed in 10usr/50sys/15274real ms.

This is a histogram of bio write requests to all block devices on my system while the script was running. If you’d see lots of requests smaller than the page size on an SSD device (this depends on the hardware), you might want to look into the write patterns of your application (although even then, things might not be as bad as they look like on first sight, since the SSD drive could perform batching, remapping and whatever else internally).

Do note the script output displayed above is not the behavior of Arakoon or Tokyo Cabinet: it depicts a rather standard desktop session (running Google Chrome, Evolution, Empathy, Skype and some more applications) during a very short timespan. The test was executed on a Fedora 16 laptop system, containing a spinning hard drive, using a combination of Ext4 and XFS on several LVM2 volumes on kernel 3.2.7-1.fc16.x86_64.


Baardskeerder’s transaction strategy

Baardskeerder is a simple embedded database based around an append-only B-treeish datastructure.
It’s a dictionary that also supports range queries and transactions.
Baardskeerder is implemented in ocaml, and the main idea is that it will replace Tokyo Cabinet in our Arakoon Key-Value Store.
This post will try to will explain our approach regarding transactions.

Preliminaries

Baardskeerder appends slabs to a log. Slabs are list of entries, and entries can be
either values, leaves, indexes, or commits. Suppose we start with an empty log, and add a first update (set “f” “F”).
Baardskeerder builds a slab that looks like this:

Value "F"
Leaf ["f", Inner 0]
Commit (Inner 1)

This slab gets serialized, and added to a log, which looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)

Notice there are Outer and Inner references. Outer ones refer to positions in the Log, while Inner ones refer to the current slab. During serialization, each inner positions needs to be translated into an outer one, while outer positions just can be copied. (In reality, Outer positions are a bit more complex to calculate as the size of entries after serialization influences them, but I’ve abstracted away from that problem here)

When a slab is ready to be serialized and added, it always ends with a Commit node. This commit node is necessary as there might be a calamity while writing out the serialized slab and it might not end up entirely in the log. So after a successful update, the last entry in the log is a Commit pointing to the current root node.
To fully appreciate the difference between slab and log references,
let’s add another update (set “d” “D”).
The new slab looks like this:

Value "D"
Leaf ["d", Inner 0; "f", Outer 0]
Commit (Inner 1)

The new leaf in the slab refers both to the value at position 0 in the log (Value “F”) and to the value at position 0 in the slab (Value “D”).
After serialization of the slab, and writing it to the log, the log looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)
Value "D"
Leaf ["d", Outer 3; "f", Outer 0]
Commit (Outer 4)

Let’s add some more updates to have a more interesting log:

 set "h" "H";
 set "a" "A";
 set "z" "Z"
 

After which, the log looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)
Value "D"
Leaf ["d", Outer 3; "f", Outer 0]
Commit (Outer 4)
Value "H"
Leaf ["d", Outer 3; "f", Outer 0; "h", Outer 6]
Commit (Outer 7)
Value "A"
Leaf ["a", Outer 9; "d", Outer 3]
Leaf ["f", Outer 0; "h", Outer 6]
Index Outer 10, ["d", Outer 11]
Commit (Outer 12)
Value "Z"
Leaf ["f", Outer 0; "h", Outer 6; "z", Outer 14]
Index Outer 10, ["d", Outer 15]
Commit (Outer 16)

This corresponds to the following tree:
tree after 5 sets

There are several little things to notice here. First, leaves that overflow (size = 4) are split into to new leaves with an Index entry to refer to them both. Also, the entries in the log only refer to previous entries, and, most importantly, nothing ever changes inside the log.

Transactions

Suppose by some accident the process dies while writing the slab, the last slab doesn’t quite fully make it to the log. In the above example, the log might end up just after the value “Z” (or a bit further with half a leaf written).
At the next opening of the log, we’ll search for the last commit, and the log gets truncated just after position 13. As such, the presence of Commit entries that make updates atomic.

They also provide a means to implement non-concurrent transactions. In fact, if we only have a commit entry at the end of a slab, the slab literally is the transaction. At the beginning of the transaction, we create a slab, and we pass it along for all updates. A commit entry is written at the end of the slab, when we’re done. The only added complexity comes from descending the tree:
We must start at the root in the slab (or log if it’s empty) and if we have to jump to an entry, we have to look at its position and jump into the slab if it’s an inner position, and into the log if it’s an outer one. (How to descend a tree is described on more detail a previous blog post)

Our transactional API is simple and clean:

 module DBX :
  sig
    type tx
    val get : tx -> k -> v
    val set : tx -> k -> v -> unit
    val delete : tx -> k -> unit

    ...

    val with_tx : log -> (tx -> unit) -> unit
  end

The following code fragment illustrates our transaction api:

let xs = ["f","F";
	  "d","D";
	  "h","H";
	  "a","A";
	  "z","Z";
	 ]
in
DBX.with_tx log
  (fun tx ->
    List.iter (fun (k,v) ->
      DBX.set tx k v) xs
  )

and suppose we apply this transaction to an empty log, we then end up with the following:

Value "F"
Leaf ["f", Outer 0]
Value "D"
Leaf ["d", Outer 2; "f", Outer 0]
Value "H"
Leaf ["d", Outer 2; "f", Outer 0; "h", Outer 4]
Value "A"
Leaf ["a", Outer 6; "d", Outer 2]
Leaf ["f", Outer 0; "h", Outer 4]
Index Outer 7, ["d", Outer 8]
Value "Z"
Leaf ["f", Outer 0; "h", Outer 4; "z", Outer 10]
Index Outer 7, ["d", Outer 11]
Commit (Outer 12)

Performance Summary: each transaction boils down to 1 slab, and each slab is written using 1 system call (write).

Slab Compaction

Batching updates in transactions uses less space compared to individual updates, but we still have a lot of dead entries. Take for example the log above. Entries 1,3, 5,8, and 9 are not useful in any way. So it would be desirable to prune them from the slab before they even hit the log.
This turns out to be surprisingly easy.

First we have to find the dead entries, which is done with a simple iteration through the slab, starting from the root, and marking every Inner position as known. When you’re done,
the positions that you didn’t mark are garbage.

type t = { mutable es: entry array; mutable nes: int}

let mark slab =
  let r = Array.make (slab.nes) false in
  let maybe_mark = function
    | Outer _ -> ()
    | Inner x -> r.(x) <- true
  in
  let maybe_mark2 (_,p) = maybe_mark p in
  let mark _ e =
    match e with
      | NIL | Value _ -> ()
      | Commit p -> maybe_mark p
      | Leaf l -> List.iter maybe_mark2 l
      | Index (p0,kps) -> let () = maybe_mark p0 in List.iter maybe_mark2 kps
  in
  let () = iteri_rev slab mark in
  let () = r.(slab.nes -1) <- true in
  r

After we have marked the dead entries, we iterate through the slab again, from position 0 towards the end, and build a mapping that tells you how to translate an old Inner position to a new Inner position.

let mapping mark =
  let s = Array.length mark in
  let h = Hashtbl.create s in
  let rec loop i o =
    if i = s then h
    else
      let v = mark.(i) in
      let i' = i + 1 in
      let () = Hashtbl.add h i o in
      let o' = if v then o + 1 else o in
      loop i' o'
  in
  loop 0 0

The last phase is the actual rewriting of the slab, without the dead entries. This again is a simple iteration.

let rewrite s s_mark s_map =
  let lookup_pos = function
    | Outer x -> Outer x
    | Inner x -> Inner (Hashtbl.find s_map x)
  in
  let rewrite_leaf kps       = List.map (fun (k,p) -> (k,lookup_pos p)) kps in
  let rewrite_index (p0,kps) = (lookup_pos p0 , rewrite_leaf kps) in
  let rewrite_commit p       = lookup_pos p in
  let esa = s.es in
  let size = s.nes in
  let r = Array.create size NIL in
  let rec loop c i =
    if i = size
    then { es = r; nes = c}
    else
      begin
	let i' = i + 1 in
	let a = s_mark.(i) in
	if a then
	  let e = esa.(i) in
	  let e' = match e with
	    | Leaf  l  -> Leaf (rewrite_leaf l)
	    | Index i  -> Index (rewrite_index i)
	    | Commit p -> Commit (rewrite_commit p)
	    | Value _
	    | NIL -> e
	  in
	  let () = r.(c) <- e' in
	  let c' = c + 1 in
	  loop c' i'
	else
	  loop c i'
      end
  in
  loop 0 0

Gluing it all together:

let compact s =
  let s_mark = mark s in
  let s_map = mapping s_mark in
  rewrite s s_mark s_map

If we take a look at the slab for our example transaction of 5 sets before compaction

Value "F"
Leaf ["f", Inner 0]
Value "D"
Leaf ["d", Inner 2; "f", Inner 0]
Value "H"
Leaf ["d", Inner 2; "f", Inner 0; "h", Inner 4]
Value "A"
Leaf ["a", Inner 6; "d", Inner 2]
Leaf ["f", Inner 0; "h", Inner 4]
Index Inner 7, ["d", Inner 8]
Value "Z"
Leaf ["f", Inner 0; "h", Inner 4; "z", Inner 10]
Index Inner 7, ["d", Inner 11]
Commit (Inner 12)

and after compaction:

Value "F"
Value "D"
Value "H"
Value "A"
Leaf ["a", Inner 3; "d", Inner 1]
Leaf ["f", Inner 0; "h", Inner 2]
Value "Z"
Leaf ["f", Inner 0; "h", Inner 2; "z", Inner 6]
Index Inner 4, ["d", Inner 7]
Commit (Inner 8)

We can see it’s worth the effort.
Of course, the bigger the transaction, the bigger the benefit of compaction.

Question to my 2.5 readers: If you’ve seen a paper, blog, article,… describing this, please be so kind to send me a link.

Offline log compaction

Given Baardskeerder has transactions with slab compaction, we can build a simple and efficient offline compaction algorithm for log files. Start from the root, walk over the tree while building big transactions (as big as you want, or as big as you can afford), and add these to a new log file. When you’re done, you have your compacted log.

Online log compaction

Baardskeerder logs can also return garbage to the filesystem while they are in use, but that will be kept for a later post.

have fun,

Romain.

Baardskeerder is Afrikaans for barber, but also refers to an order of creatures called Solifugae.
A more marketing-prone name would have been Mjolnir Mach3 Ultra Turbo Plus Enterprise Edition with Aloe vera.

What’s in a name?


What are you playing with today?

introduction

A colleague of mine recently asked me how I do research, and where the ideas come from.
The conversation went something like this:

NN: yeah, but how do you learn all these things?
ME: It’s a side-effect, really. Let me explain: when in life do you learn the most?
NN: It must be childhood…
ME: Yes, and why is that?
NN: The brain is more flexible when you’re young…
ME: Yes, that helps considerably, but that’s not the full answer.
The thing that especially helps is that you were playing.
NN: So, you’re telling me I should play more?
ME: Yes, but in order to get away with it, you call it research.

Currently, I’m playing with doing research about persistent data structures.
Literature uses a number of different names (CoW, append-only, log-structured, persistent, …) where updates (at least conceptually)
never change anything which means you have access to all previous versions.
This is associated with a lot of buzz-words like instant snapshotting, cheap transactions, ssd optimized, hot copy, …

Append-only binary tree

To illustrate the concept, I’ll show you how to play with an append-only binary-ish tree (a B-tree would just be more work, without additional fun).

binary tree variation

Lets start off with some pictures of the data structure we want to implement.
The append-only representation will be shown in the next section.
Initially, the tree is empty, and there is nothing to show, but after adding the first key value pair (“f”,”F”), which just means that “f” is mapped to “F”,
the tree looks like this:



A key node (shown as an oval, and called a leaf) just points to a value node (shown as a box).

This is the tree after we insert a second mapping (“d” maps to “D”):



A branch node (shown as a trapezium) splits the key space in 2 parts and everything smaller than or equal to its separator (“d”) dangles left, everything else, dangles right.

After adding the mappings (“h”,”H”) (“a”,”A”) and (z”,”Z”) we have the following tree:


append-only representation

The name append-only stems (I think) from the file based view, where only write at the end of the file (append).
It is also called a log.
For the one element tree, this is simple, once you realize that since “f” points to “F” you need to write value “F” first before you write the leaf “f”:



The number at the left of an entry denotes its position in the log, so later nodes can refer to it (the 0 in the leaf “f” points to the value at position 0).
Adding stuff can only be done at the right hand side, and things can only point to things that have already been written.
Inserting (“d”,”D”) means first writing the value “D”, then writing the leaf “d”, and finally the branch node.
The result is shown below:



Ok, adding (“h”,”H”),(“a”,”A”) and (“z”,”Z”) yields the following: (click to enlarge).



You immediately see that the space overhead can be considerate.
At some point in time, you don’t care anymore about older snapshots and you will want to have the possibility to reclaim that space.

minimal implementation

We still don’t have anything to play with. Let’s do a minimal implementation.
First start with a log.

type v = string
type k = string
type pos = int

type entry = 
  | NIL 
  | V of v
  | L of k * pos
  | N of pos * k * pos 

type log = {es:entry array;  mutable next : int;}

Entries are either nil, values, leafs or nodes. Btw, this is ocaml.
A log is an array, combined with an attribute next that tells you where next to ‘write’.
Some helper functions are needed.

let make cap = {es = Array.make cap NIL; next = 0;}
let root_pos log = log.next -1
let get_entry log pos = if pos = -1 then NIL else log.es.(pos)
let get_length log = Array.length log.es
let free log = get_length log - log.next
  
let write log es =
  let do_one e = 
    if free log = 0 then failwith "full";
    let p = log.next in
    log.es.(p) <- e;
    log.next <- p + 1 
  in
  List.iter do_one es

Retreiving a value is straight forward, just descend starting from the root.

let get log k = 
  let rec find_pos pos = 
    match get_entry log pos with
      | V v -> v
      | L (k0,p0) when k = k0 -> find_pos p0 
      | N (l,k0,r) -> let p' = if k <= k0 then l else r  in
		      find_pos p'			
      | _ -> failwith "Not_found"
  in
  find_pos (root_pos log)

Adding a mapping isn’t that hard. The trick is to record the places you visited on the way down, these are exactly the entries that will need to be rewritten.

type dir = Hit | Left | Right
type trail = (dir * entry * pos) list

exception Todo of trail * string
let todo v msg = raise (Todo (v,msg))

After introducing the direction and the trail I added an exception for cases not handled.
The skeleton for the ‘set’ is rather straight forward:

let set log k v =
  let rec descend trail pos = 
    match get_entry log pos with
      | NIL -> trail
      | V v -> failwith "corrupt"
      | L (k0,p0) as e -> let dir = 
			    if k = k0 
			    then Hit else if k < k0 
			      then Left 
			      else Right 
			  in
			  (dir , e, pos) :: trail
      | N (l,k0,r) as e when k <= k0 -> descend ((Left,e, pos) :: trail) l
      | N (l,k0,r) as e              -> descend ((Right,e,pos) :: trail) r
  in
  let trail = descend [] (root_pos log) in
  let update = build_set k v log.next trail in
  write log update

So first we go down the tree accumulating a trail, which we use to build the update that gets written to the log.
Let’s build the update:

let rec build_set k v start (visited:trail)  = V v :: do_start start k visited 
and do_start start k visited = match visited with
  | [] -> [L (k, start) ]
  | (dir , L(k0,p0),pe) :: rest ->
    begin
      let e = L(k,start) in
      match dir with
	| Hit   -> e :: do_rest (start + 1) rest
	| Left  -> e :: N(start + 1, k, pe) :: do_rest (start + 2) rest
	| Right -> e :: N(pe, k0, start +1) :: do_rest (start + 2) rest
    end
  | _ -> todo visited (Printf.sprintf "do_start %i '%s'" start k)
and do_rest start visited = 
  match visited with
    | [] -> []
    | h :: t ->
      let e = 
	match h with
	  | (Left , N(pl,k0,pr),pe)  ->  N(start,k0,pr) 
	  | (Right, N(pl,k0,pr),pe)  ->  N(pl,k0,start)
	  | _ -> todo visited (Printf.sprintf "do_rest %i" start)
      in
      e :: do_rest (start + 1) t

Creating the update is done in three steps, where do_rest is the most difficult.
The key insight is that if on the way down, you took the left branch, that part of the node needs to change, the other part can be copied.

interactivity, please

The piece of code below allows you to dump a log.

let dump log = 
  Array.iteri (fun i e->
    let () = Printf.printf "%2i: " i in
    match e with
      | NIL        -> print_newline()
      | V v        -> Printf.printf "V \"%s\"\n" v
      | L (k,p)    -> Printf.printf "L(\"%s\",%i)\n" k p 
      | N (l,k0,r) -> Printf.printf "N(%i,\"%s\",%i)\n" l k0 r
  ) log.es

an example dump is this:


 0: V "F"
 1: L("f",0)
 2: V "D"
 3: L("d",2)
 4: N(3,"d",1)
 5: V "H"
 6: L("h",5)
 7: N(1,"f",6)
 8: N(3,"d",7)
 9: V "A"
10: L("a",9)
11: N(10,"a",3)
12: N(11,"d",7)
13: V "Z"
14: L("z",13)
15: N(6,"h",14)
16: N(1,"f",15)
17: N(11,"d",16)
18: 

But examining this to see if all references are correct is rather tedious.
With a few lines of code you can generate output that can be processed by graphviz to generate images that allow visual inspection.

let dot_log ?(f = stdout) log = 
  Printf.fprintf f "digraph Log{\n";
  Printf.fprintf f "\trankdir=\"RL\";\n";
  Printf.fprintf f "\tnode [shape=record];\n";
  let () = Array.iteri (fun i e ->
    let () = match e with
      | NIL -> ()
      | V v     -> 
	Printf.fprintf f "\tnode%i [label = \"{%i|%s}\"];\n" i i v
      | L (k,p) -> 
	Printf.fprintf f 
	  "\tnode%i [label = \"{%i | { %s | <f1> %i} }\"];\n" 
	  i i k p;
	Printf.fprintf f "\tnode%i:<f1> -> node%i;\n" i p
      | N(l,k0,r)  -> Printf.fprintf f "\tnode%i [label = \"{%i| { <f1> %i | %s | <f2> %i}}\"];\n" i i l k0 r;
	Printf.fprintf f "\tnode%i:<f1> -> node%i;\n" i l;
	Printf.fprintf f "\tnode%i:<f2> -> node%i;\n" i r;
    in
    if e <> NIL && i > 0 then Printf.fprintf f "\tnode%i -> node%i [style = invis];\n" i (i-1)
  ) log.es in
  Printf.fprintf f "}"

This needed a bit of experimentation, but the record feature helps a lot.
To assure things are rendered in the correct left-to-right order, every entry has an invisible arrow to its predecessor.
The last fragment glues graphviz and evince together to have the kind of interactive visualization you appreciate from mathlab or scipy.

let view ?(v=dot_log) log = 
  let root = "test" in
  let dot = Filename.temp_file root ".dot" in
  let png = Filename.temp_file root ".png" in
  let oc = open_out dot in
  let () = v ~f:oc log in
  close_out oc;
  let convert_cmd = Printf.sprintf "dot -Tpng -o %s %s" png dot in
  let _ = Sys.command convert_cmd in
  let cmd = Printf.sprintf "evince %s" png in
  Sys.command cmd

let view_log  log = view ~v:dot_log log

Ok, this calls for a screenshot:


closing remarks

The essence is that I build me a nifty toy to gain insight in persistent data structures in less than 150 lines of code,
which is a tribute to the power of ocaml.
All that may be true, but the code isn’t that minimal, and not very pretty:

  • build_set can be made tail recursive and shorter as well.
  • The mutual recursive functions are not needed neither if they are defined in the right order.
  • leaf nodes can be suppressed.
  • dot_log feels hackish.

Most of the time in writing the code was actually spent trying to seduce graphviz in doing what I wanted.

Have fun,

Romain.

Post mortem

This was my first blog post ever.
I should probably have done something shorter as I’m still learning the wordpress web interface.


Rethinking an embedded database API

One of the projects the Incubaid research team is working on is a new embedded database (where ’embedded’ refers to the way other projects like GNU DBMBerkeley DB, Tokyo Cabinet or SQLite are used). This project, codename Baardskeerder, is still in its infancy, and we’re toying with different approaches to various problems.

The database itself does not expose a full-fledged SQL interface like SQLite does: the interface provides the ability to store values for a given key, retrieve the value stored under a key, and some more complex operations like range lookups.

The API for a get operation seems rather obvious on first sight: you pass in a key, and the corresponding value is returned if the key has been set before, or the non-existence is signaled somehow (note: all pseudocode in this post is Haskell):

import qualified Data.ByteString.Char8 as BS
import Data.ByteString.Char8 (ByteString)

type Key = ByteString
type Value = ByteString

data DB = DB

get :: DB -> Key -> Maybe Value
get d k = undefined

Since the database is persisted (e.g. in a file on a file system on a disk), and will most likely be used by server-style applications, returning values to some client, whilst the server application doesn’t care about the values themselves, this style of API has a major drawback: it forces copying values from kernelspace into userspace, after which the value is written to some socket, copying it from userspace back into kernelspace, without any intermediate modifications, or even access.

In most modern kernels, including Linux, BSD, Solaris and others, there are several system calls available which allow one to send data from files on a socket, without this useless copying to and from userspace. These include sendfile(2) and splice(2). We want to be able to support these optimizations from within applications embedding the database library, whilst still providing the ‘basic’ API as well.

One approach would be to implement a custom getAndSendOnSocket call in the library, but this feels very much ad-hoc. Another way to tackle this is to return a file descriptor, an offset and a length on a get call, but this doesn’t feel right, since we don’t want to leak a file descriptor to the host application as-is.

What we got now is a callback/continuation-driven approach, where a host application can provide an action which will have access to the file descriptor, get an offset and length value, and can do whatever it feels like, whilst being wrapped inside an exception handler.

The signature of our get’ action becomes

get' :: DB -> Key -> (Result -> IO a) -> IO a

Here’s a pseudocode-implementation:

import Prelude hiding (concat, length, lookup)
import Data.Binary
import Data.ByteString.Char8
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int32)
import Control.Exception (bracket, finally)
import Foreign.Marshal.Alloc
import Foreign.Marshal.Array
import Network.Socket (Socket)
import Network.Socket.ByteString
import Network.Socket.SendFile.Handle
import System.Posix (COff, Fd, fdToHandle)
import System.Posix.IO.Extra (pread)

type Length = Int32

type Key = ByteString
type Value = ByteString

data DB = DB

data Result = NotFound
            | Value Value
            | Location Fd COff Length

-- Lookup a value in the database
lookup :: DB -> Key -> IO Result
lookup d k = undefined

-- Take a reference to the database
-- This is dummy, the actual API is more complex
ref :: DB -> IO ()
ref d = undefined

-- Return a reference to the database
unref :: DB -> IO ()
unref d = undefined

get' :: DB -> Key -> (Result -> IO a) -> IO a
get' d k h = do
    v <- lookup d k

    let (pre, post) = case v of
            Location _ _ _ -> (ref, unref)
            otherwise -> (\_ -> return (), \_ -> return ())

    pre d
    h v `finally` post d

Note a Result can be NotFound, an actual value (in our case this can happen if the value is compressed or encrypted on disk), or a pointer to the value in a given file.

Implementing the classic get becomes trivial:

get :: DB -> Key -> IO (Maybe Value)
get d k = get' d k h
  where
    h :: Result -> IO (Maybe Value)
    h NotFound = return Nothing
    h (Value v) = return $ Just v
    h (Location f o l) =
        Just `fmap` bracket
            (mallocArray l')
            free
            -- TODO This assumes pread(2) always returns the requested number
            -- of bytes
            (\s -> pread f s l' o >> packCStringLen (s, l'))
      where
        l' :: Int
        l' = fromIntegral l

The implementation of a get call which sends a value on a socket, prefixed with the length of the value, becomes simple as well:

sendOnSocket :: DB -> Key -> Socket -> IO ()
sendOnSocket d k s = get' d k h
  where
    h :: Result -> IO ()
    h NotFound = sendMany s zero
    h (Value v) = do
        sendMany s $ BL.toChunks $ encode $ (fromIntegral $ length v :: Length)
        sendAll s v
    h (Location f o l) = do
        sendMany s $ BL.toChunks $ encode l
        f' <- fdToHandle f
        sendFile' s f' (fromIntegral o) (fromIntegral l)

    zero = BL.toChunks $ encode (0 :: Int32)

Instead of sendFile’, some FFI binding to splice or something similar could be used as well.

In the current pseudo-implementation, a file descriptor can still be leaked (an application can call get’ with an action which stores the given Fd in some IORef), and given actions can alter the file pointer using lseek(2) and others. We could tackle this, if necessary, by duplicating the file descriptor using dup(2) and passing the copy to the application, then closing the duplicate after completion (so there’s no use for the application to store the descriptor: it’ll become invalid anyway).