Tracking Asynchronous IO Using Type Systems

Some time ago I gave a short presentation to some colleagues of mine about the
Python gevent library, and the low-level libraries it uses to perform its job
(which mainly boils down to handle asynchronous IO and managing the microthreads
waiting for these asynchronous actions to complete, using libev or libevent as
a wrapper around select/poll/epoll and the greenlet hack to support lightweight
threads in Python).

The gevent library contains a module which monkey-patches several modules in the
Python standard library to change their synchronous nature into an asynchronous
implementation running on top of the gevent mainloop, including socket and
thread. This is required to work around one of the major pitfalls whenever
trying to use async IO in a runtime like Python: any existing code/library/…
which performs (standard) blocking calls somehow, will block the
(single-threaded, from an OS perspective) mainloop, and as such inhibit
execution of any runnable microthreads at the time (this is something where e.g.
node.js has an edge over Python when developing highly concurrent servers. Do
not fear, I won’t get into the topic of whether or not it’s a good idea to write
highly concurrent and scalable services in Python or Javascript in this post).

Next to providing a mechanism to handle concurrent IO, the use of greenlets to
manage threads of execution also introduces another useful property of gevent:
instead of threads backed by OS threads, executed in parallel on an SMP system,
using preemptive multitasking, the lightweight threads are scheduled in
userspace by the library itself, in a cooperative manner: the places at which a
thread of execution yields execution (also known as ‘switch’ in gevent) is
explicity defined inside the code (whether it’s yours or in some library). As
such one can make assumptions (or rather, assertions) about some code using
some mutable data shared across execution threads which can be thread-safe in
the cooperative settings, whilst it would be unsafe in a preempted scenario.

The latter raised an interesting question from a colleague: is there a way to
assert some code will not yield execution, i.e. some section will always be
executed as an atomic block? This is, obviously, an important consideration
when relying on this property when writing code without locks which would
become incorrect if the tread could be switched out!

I answered (maybe to their surprise) this is certainly possible and standard
practice in some languages, yet as far as I know not in Python (or, at least,
not very elegant). I didn’t get into this any further at the time, yet here’s a
post in which we will devise such tracking system.

It’s based on several concepts from the world of Functional Programming, yet
don’t fear: you don’t need any knowledge about any FP concepts at all. Read on
and enjoy the ride!

Asynchronous Programming

As you most likely heard, there are 2 ways data can be provided to something
making a request: synchronously, or asynchronously. In the prior system, the
requesting entity will wait (block) until it receives a response, then
continue execution. When using the asynchronous pattern, some code will issue a
request, then tell the system what to do when the request completed and a
result is available, and then continues working on something else, or yielding
execution to some other execution thread (please make sure to make a strong
distinction between thread of execution and system thread: there might be
some relation between these in some programming patterns, yet conceptually
they’re different beasts!).

Note there’s a major difference between execution mechanisms (sync vs. async)
and APIs: whilst some APIs explicitly expose their asynchronous execution
nature by using callbacks (think node.js, Twisted‘s Deferred type, which is a
sort of callback-on-steroids,…), others might look as if every call is a
blocking procedure call, yet internally everything is dispatched using some
asynchronous mechanism (think CPS, ‘Future‘-style APIs which get compiled
into something using CPS, or (you guessed it) systems like gevent or the IO
manager and microthreads as found in GHC‘s Haskell runtime).

The differences (or should I say, similarities?) of these styles deserve a
complete post on their own though!

Enter Types

Whenever we want to track something at a code level (which is basically, at
compile time), all we can use is what is known at this compile time: types. We
can’t rely on any values (since there are no actual values available! Go
figure: since much use of asynchronous programming is found when handling IO,
it’d be funny to know what we’ll receive on a socket at runtime while compiling
the program)!

We invent a name for the type which will be used to tag all values which we
received thanks to some sort of asynchronous call (which might introduces a
yield point): we’ll call it async. Now this is rather useless: async
what?

Those of you who toyed with generics before in Java, C#, Scala or anything
alike might think the answer is rather trivial: we create a generic type to
tag the actual type of the value we retrieved using an async call! These
generic types are also known as higher kinded type in FP, yet you shouldn’t
care too much about this for the rest of this discourse.

So, we now got our type, written as ‘a async (using OCaml notation). This is
similar to Async<A> in Java and C# (as far as I remember). For those new to
generics: think of a list of items. One could have a list of strings, one of
ints, myobjects or anything else. The type of these lists will then be string
list
, int list and myobject list. We can write functions which can act on
any kind of lists (e.g. check whether the list is empty: it doesn’t matter
what type of values the list contains!). As such the list type is defined as a
more generic type which takes a type parameter: ‘a list. It’s
parametrised over type ‘a.

So, char async is a char we obtained using using some async action, and
a value of type string async is a string we got through some async call (it’s
an action which will yield a string value). Do note string async and string
are completely different types, as such it’s not (and should not be!) possible
to pass a value of type string async to a function which takes a string
value as its input. You could consider a value of type string to be a real
string value, a sequence of characters, whilst a value of type string async
is the representation of "a string value we’ll retrieve using some async call",
which is not the value itself.

Note this type comes pretty close to a Future, which is also often
parametrised (a generic class) Future<A>, which signifies "a box which will
eventually contain a value of type A".

Primitives

For our system to work, we’ll need a couple of primitive, magic operations,
provided by the system, which are the basic building blocks of everything
above. These are the operations which implement the most low-level async
procedures which can only be provided by the runtime system. Everything else
can be constructed on top of these by combining functions and actions and as
such building our applications, composing entities into larger entities.

We will only introduce 3 of these primitives, which should be sufficient for
the mechanism outlined in this post to be clear. In a real, production library
or runtime system one would need a couple of more of these primitive actions to
be
able to do something interesting, of course!

Of these 3 actions, 2 work on file descriptors: one writes a given string to a
given file descriptor and doesn’t return any result (we assume the data will
always be written at once, and this will never fail, unlike the write(2)
system call, obviously), the other reads a string of requested length from a
given file descriptor (the same principle with regard to success and failure
applies).

The third action allows a thread to sleep for a given amount of time (say,
seconds, although that obviously doesn’t make any difference). The result of
this action contains the number of seconds the thread actually did sleep.

Here are the prototypes. Note unit is a type with only a single value, (),
which is used when some other languages use void (it’s not exactly the same
thing, but for now this comparison should hold):

write : file_descr -> string -> unit async
read : file_descr -> int -> string async
sleep : int -> int async

Keeping the Genie in the Bottle

Ok, first steps are done: we got a type to wrap values which were obtained
using an async action, and we got a couple of actions defined. Next step: doing
something useful with these operations!

Let’s start with a very simple exercise: writing an action which reads a single
character from standard input (a value named stdin of type file_descr) and
writes it to standard output (a value named stdout of type file_descr).

First, we’ll create 2 utility functions to handle standard input and output, by
using partial application (if you don’t know what this is, consider the
following example: given the function mul :: int -> int -> int which
multiplies 2 integers, we can use partial application and call mul with a
single argument, e.g. 10, which results in a function of type int -> int.
Whenever passing an argument to this function, the result will be 10 times this
argument):

let read_stdin = read stdin
let write_stdout = write stdout

The types:

read_stdin : int -> string async
write_stdout : string -> unit async

Now we could try to write our program:

let attempt () =
    let char = read_stdin 1 in
    let _ = write_stdout char in
    ()

Failure all over! First of all, the code will not type-check:

Error: This expression has type string async
       but an expression was expected of type string

referring to the char argument on the line calling write_stdout.

Here’s why: write_stdout wants its first argument to be of type string
(this is the type of the second argument of write, as you know), but the
value we provide, named char, is not of type string: its type is
string async, the return type of the read_stdin (or further down read)
action!

Next to that, if the code would type-check, our initial goal would have
failed: the type of attempt would be unit -> unit, which doesn’t signify
we used some of the async primitives at all! Our action must return something
of type unit async, and there should be no way to write an action whose
return type is not ‘a async for some type ‘a!

Back to the drawing board… It looks like standard assignment and passing
values around as-is won’t work (remember I stressed it’s important to make the
distinction between a string value and something representing some string
retrieved using some async action of type string async?).

We dig into our FP toolkit once more, and grab another hammer (after the type
system we used earlier): function composition! Or, in this case, action
composition.

What we want is a function we can use to link two actions together into a
new action!

Let’s try to figure out some type signature for such function:

link : 'a async -> 'b async -> 'b async

link takes an initial action which yields something of type ‘a, a second
action which yields a ‘b, and combines these into an action which also yields
a ‘b async action.

But wait, this can’t be right either! In this setup, the second action still
has no way to retrieve and actually use the ‘a value as encapsulated by the
first action!

We need to extend our link function to unwrap whatever the first given action
yields, then pass it to the second argument as a proper value it can use,
whilst still making sure the fact there’s something asynchronous going on is
retained.

Here’s what the type of our link function should look like:

link : 'a async -> ('a -> 'b async) -> 'b async

The second argument now became a function (cool, uh, functions taking functions
as arguments? These are also called "higher-order functions" (remember
"higher-kinded types"?) and are very powerful) which takes a value of type
‘a, and results in an action of type ‘b async, which is then used as the
result of the link function.

Note how you can see, only by looking at the type signature, the actual ‘a
value can never be leaked out of some other ‘b async action, since that’s
what the second argument must return, and only this function ever gets access
to the plain ‘a value?

Do note we will not discuss how link is implemented, since this is
unrelated to this post, and depends on how the whole library is designed and
implemented!

Let’s get back to our single-character echo action: using link, we need an
initial action of type ‘a async. We got that: read_stdin 1 has type
string async. Ok, next we need a function of type ‘a -> ‘b async. We know
what ‘a is now, since we already decided to use read_stdin 1 as first
argument, so ‘a is string. We’re looking for a function of type
string -> ‘b async which writes the given string to the screen. This is easy:
write_stdout has exactly this type, using unit for ‘b!

Here goes:

let echo_one = link (read_stdin 1) write_stdout

The type of echo_one is unit async, like we want it to be!

From now on though, we won’t use the name link anymore: this is just
something I made up. A more common name for such function is bind, which
we’ll use from now on. Next to that, there’s an infix operator (an infix
operator is a function which takes 2 arguments and is placed in-between
these arguments instead of before them, like the + in 1 + 2) called >>=.
This allows us to rewrite our echo_one action like this:

let echo_one' = read_stdin 1 >>= write_stdout

Let’s make things more interesting: writing an action which reads 10 characters
from standard input, then sleeps maximum 2 seconds, then writes these
characters to some given file descriptor:

let sleepy_writer out =
    read_stdin 10 >>= fun data ->
    sleep 2 >>= fun _ ->
    write out data

You might notice the use of indentation and anonymous functions, and we ignore
the result of the sleep action (we use _ as its binding name), but the
code should be easy to understand.

If this isn’t clear, here’s how to read the above snippet: we define a function
called sleepy_writer which takes a single argument called out. Upon
invocation, the function will result in 10 chars to be read from stdin.
read_stdin 10 is an action which, upon completion, will yield a string. We
bind a function which takes this string value (which we bind to the name
data in an anonymous function) and returns another action: everything starting
with sleep up until the very end of the function body. So, once read_stdin
10
has completed, we continue with this next action, which will make the
current thread of execution sleep for 2 seconds. Once again, we bind this to a
function which takes the value which the sleep 2 action yields and ignores
this (by binding it to the name _), then results in one last action which will
be executed as well, and will write the data value (which is at that moment in
scope!) to the given out file descriptor. The result of the sleepy_writer
action will be the result of the write action.

Try to figure out the type of sleepy_writer. Got it?

sleepy_writer : file_descr -> unit async

Notice we didn’t get rid of the async marker!

Finally, an action which keeps reading chunks of data of given chunk size from
a given file descriptor, then writes it to another given file descriptor:

let rec copy_stream chunk_size in_fd out_fd =
    read in_fd chunk_size >>= fun data ->
    write out_fd data >>= fun () ->
    copy_stream chunk_size in_fd out_fd

Even though copy_stream is infinitely-recursive, its type can be calculated:

copy_stream : int -> file_descr -> file_descr -> 'a async

Yet again, the async marker sticks.

Do note, in real-world code, one should not use a top-level rec definition
but define some loop action in the body etc.

Return, But Not As You Know It

One last step in our journey is required. Up until now all actions we created
by combining other actions using bind had the result value of the last
action in this chain as their result, whilst in some actions we want to
calculate such result and return value. Due to the constraints we wanted to
achieve (and as imposed by the only function we can use to actually use
actions, bind) we can’t just use plain values, they need to be wrapped in the
async container as well. So here’s what we need: something which turns an
‘a into an ‘a async:

whatever_it_is_called : 'a -> 'a async

For some reasons, this function is mostly called return. Don’t be mistaken,
this is completely unrelated to the return statement as found in
C/Java/C#/Python/PHP/… and is no related to end the execution of a procedure
and signal some result value or anything alike. It’s a normal function to put
a value in an async box, even though this value itself was not retrieved
using some async action as-is:

return : 'a -> 'a async

Given this, we can write some more interesting actions. As a first example,
let’s write an action which reads a single line from a given file descriptor by
reading characters one-by-one until it finds an ‘n’ character, then yields
the line it read (without the newline):

let read_line fd =
    let rec loop acc =
        read fd 1 >>= fun char ->
        if char = "\n"
        then return acc
        else loop (acc ^ char)
    in
    loop ""

If you’re not versed into FP yet, this might be somewhat hard to read and
understand at first. Take a second look and follow the execution flow manually,
it’ll become clear. It might be useful to know the ^ operator is used to
concatenate strings:

(^) : string -> string -> string

Did you try to figure out the type of read_line? It’s as intended:

read_line : file_descr -> string async

One more example: since the sleep action might return even before the
requested number of seconds has passed (don’t ask me why, I just made that up),
we want to write an action which sleeps at least the given number of seconds,
and as little more as possible (otherwise we could sleep eternally. Nice
try!). We don’t care how long we slept in the end (which is a rather stupid
requirement: a serious API would return this value, and a caller is free to
ignore this).

Here we go:

let sleep_min sec =
    let rec loop left =
        sleep left >>= fun slept ->
        if left < slept
        then return ()
        else loop (left - slept)
    in
    loop sec

The type of sleep_min? int -> unit async.

Conclusion

Here we are! Using the ‘a async type, bind and return, we have a system
which allows us to combine and use asynchronous actions, whilst being sure we
can never forget something async is going on under the hood, no matter how
complex the actions themselves become. If we don’t see something of the
‘a async type, we can be certain nothing is using the async primitives
underneath.

Notice how we were able to implement something which gives what we wanted from
the beginning, without any specific support in the language we’re using:
only fairly standard type system requirements, and functions, as available in
several languages, including OCaml, Haskell and several others (although in
large languages without first-class functions etc. syntax might become an
issue, thinking of Java and alike).

Thanks to the use of types, the compiler can be very helpful during development
to reduce the number of potential runtime issues. Even though a system like the
above can be implemented in dynamic-typed languages like Python or Ruby, having
compile-time type checking offers a lot of safety!

Note this has been a very basic introduction, so now comes…

Going From Here

Once you reached this point, you might want to get to know more about the
mechanics behind the system outlined above. As some might have heard, bind
and return are often used in the presence of monads, and indeed, one might
think our ‘a async type is monadic (it might be, but not necessarily: the
monad laws won’t be fulfilled in the presence of real IO). Overall monads
provide a way to track "things with a tag which should always keep this tag".
The above is a very informal definition and introduction, but the interested
reader might refer to one of the many monad tutorials available on the internet
(all of varying quality and usefulness).

Next to this, reading up on functors (not the OCaml kind, but things with the
function fmap :: (a -> b) -> ‘a f -> ‘b f) could be useful as well (our
‘a async type is a functor, like any other monad:
let fmap f a = a >>= fun v -> return (f v)).

Some links:

Advertisements

Tracing block device write request sizes in Linux using SystemTap

Recently some people started to wonder how Arakoon, our distributed key-value store, handles the drive on which data is stored. To be more precise, this boils down to how Tokyo Cabinet (which we currently use as database implementation) submits write requests to the kernel, and how the kernel processes these later on, after translating the filesystem-level requests into write operations in the block layer.

This is considered important to know due to the usage of SSD drives (think wear leveling and other issues coming with these devices).

Tracing how the application writes data to a file can be achieved using the strace tool, looking for write(2) and pwrite(2) calls (although this is not necessarily sufficient: the application might be writing to a file using mmap(2) memory mappings).

On most loads, this won’t correspond to how block requests are sent to the actual hardware, though: writes can be buffered, reordered, etc. What we want is a way to gather information of write requests in the kernel block layer.

In Linux, one can use SystemTap (which is somewhat like DTrace on Solaris) to hook into the kernel at runtime and collect statistics, or lots of other information. One writes a script in which one hooks into trace points or function calls, then processes the incoming information.

I won’t go into the installation of SystemTap, since this will depend on the distribution you’re running. Do note you might not need root access to execute the scripts (so you shouldn’t either!): on my Fedora 16 system, all I had to do was putting my user in the stapsys, stapusr and stapdev groups (although I’m not sure I really need to be in all of those…). You’ll need debugging symbol files of your running kernel available, among other things (in my experience attempting to run a script on a system where some prerequisites are missing will result in the stap tool telling you what you need).

Writing SystemTap scripts sometimes requires some knowledge of kernel internals, to know where to hook in. Since I wanted to gather some statistics about block-layer calls, I knew I had to look for calls in the ‘bio’ subsystem, short for “Block IO”. Luckily SystemTap comes with some predefined combinations of hooks, called ‘tapsets’. These are stored (on my system at least) in /usr/share/systemtap/tapset. One of these files is called ioblock.stp, which sounds interesting. Since I was too lazy to look for documentation, I opened the file in a pager and read through it. The ioblock.request tap looked interesting, which provides access to a size value. After reading some example SystemTap scripts, here’s what I came up with:

global writes

probe ioblock.request {
    if(bio_rw_num(rw) == BIO_WRITE)
        writes[devname] <<< size
}

probe end {
    printf("\n")
    foreach([devname] in writes-) {
        printf("Device: %s\n", devname)
        println(@hist_log(writes[devname]))
    }
}

Running this using the stap tool, and closing it after a while (using CTRL-C) gives the following output:

$ stap -v blockio.stp
Pass 1: parsed user script and 92 library script(s) using 210196virt/30852res/3136shr kb, in 140usr/20sys/251real ms.
Pass 2: analyzed script: 3 probe(s), 21 function(s), 2 embed(s), 2 global(s) using 452960virt/226472res/92556shr kb, in 1500usr/100sys/2541real ms.
Pass 3: translated to C into "/tmp/stapeJldxD/stap_e40cc31c16b7dd290dabd43749c577a4_12507_src.c" using 452960virt/226600res/92684shr kb, in 10usr/0sys/12real ms.
Pass 4: compiled C into "stap_e40cc31c16b7dd290dabd43749c577a4_12507.ko" in 1780usr/350sys/2595real ms.
Pass 5: starting run.
^C
Device: sda5
value |-------------------------------------------------- count
    0 |@@@@@@@@@@@@@@@                                    30
    1 |                                                    0
    2 |                                                    0
      ~
  256 |                                                    0
  512 |                                                    0
 1024 |                                                    1
 2048 |                                                    0
 4096 |@@@@@@@@@@@@@@@@@@@@@@@@@@@                        54
 8192 |@@@                                                 7
16384 |@                                                   3
32768 |                                                    0
65536 |                                                    0

Device: dm-2
value |-------------------------------------------------- count
    0 |@@                                                  5
    1 |                                                    0
    2 |                                                    0
      ~
  256 |                                                    0
  512 |                                                    0
 1024 |@                                                   2
 2048 |                                                    0
 4096 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@                      58
 8192 |@@@@@@@                                            14
16384 |@@@                                                 6
32768 |                                                    0
65536 |                                                    0

Device: dm-4
value |-------------------------------------------------- count
    0 |@                                                   2
    1 |                                                    0
    2 |                                                    0
      ~
  256 |                                                    0
  512 |                                                    0
 1024 |@                                                   2
 2048 |                                                    0
 4096 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@                      58
 8192 |@@@@@@@                                            14
16384 |@@@                                                 6
32768 |                                                    0
65536 |                                                    0

Device: dm-1
value |-------------------------------------------------- count
 1024 |                                                    0
 2048 |                                                    0
 4096 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ 50
 8192 |                                                    0
16384 |                                                    0

Device: sda
value |-------------------------------------------------- count
 1024 |                                                   0
 2048 |                                                   0
 4096 |@@                                                 2
 8192 |                                                   0
16384 |                                                   0

Pass 5: run completed in 10usr/50sys/15274real ms.

This is a histogram of bio write requests to all block devices on my system while the script was running. If you’d see lots of requests smaller than the page size on an SSD device (this depends on the hardware), you might want to look into the write patterns of your application (although even then, things might not be as bad as they look like on first sight, since the SSD drive could perform batching, remapping and whatever else internally).

Do note the script output displayed above is not the behavior of Arakoon or Tokyo Cabinet: it depicts a rather standard desktop session (running Google Chrome, Evolution, Empathy, Skype and some more applications) during a very short timespan. The test was executed on a Fedora 16 laptop system, containing a spinning hard drive, using a combination of Ext4 and XFS on several LVM2 volumes on kernel 3.2.7-1.fc16.x86_64.


Announcing Baardskeerder

We’re happy to announce the public availability of Baardskeerder, our implementation of a Copy-On-Write (append-only) B-tree-ish embedded database, after approval by our CEO. The source is available on GitHub as part of the Incubaid organization at https://github.com/Incubaid/baardskeerder, under the LGPL-3 license.

This is a technology preview. Baardskeerder is still under heavy development, and breaking changes will occur before a first stable release is made.

After experiencing performance issues when storing “large” (> 8kb) values in TokyoCabinet, the storage back-end used by our distributed key-value store Arakoon, the Baardskeerder project was initiated. Work has been started to integrate Baardskeerder in Arakoon as well.

If you’re interested in the name of the project, take a look at Wikipedia.


Hole-punching compaction of an append-only database

The Baardskeerder project, as explained before, is an implementation of a B-tree-ish data structure, using an append-only file to persist the data to a storage device (e.g. a hard drive or SSD device). Not overwriting data has several advantages (consistency guarantees, efficiency,…), yet it also incurs a major disadvantage: over time, an increasing amount of data is stored and won’t ever be reclaimed, even if the data itself isn’t of any use any longer (e.g. a value node could be no longer referenced because the corresponding key was deleted or a new value was stored, internal nodes describing the tree itself become invalid because the tree structure changes when new key/value-pairs are added,…).

Even though storage capacity becomes cheaper, there’s no such thing as an infinite disk, and not freeing disk space, even though the data it contains has no actual value any longer, is useless.

One way to reduce the overhead is to rewrite the database, so it only contains the data contained in the tree at the moment the action is initiated. We call this a “full compaction”, rewriting the database to a second file. This has a major advantage: after compaction, the new database file is the smallest you can get (it doesn’t contain any overhead data), and as such data is stored very dense.

Their are some drawbacks as well: the operation can’t be performed online without taking specific measures, since the resulting database will only represent a snapshot of the original one. There might be a storage capacity issue as well: because the data is copied to another file, we need the disk space available to be able to perform these copies. In the worst case, if no data can be discarded at all, we’ll need 100% of the original database size available as free space, i.e. when compacting to the same disk, the size of the original database should be less than 50% of the total disk capacity. Do note this really is a worst-case scenario.

Next to this, when the resulting database of the compaction process should be stored on the hard disk which also contains the original database, performance will suffer: lots of random IO will be needed to read the original database, and because of this the IO operations to write the output file, which should be sequential, append-only during normal operation, will become random as well (because it’s interleaved with random reads).

Overall, even though the “full compaction” mechanism yields the best result in the end, it’s not something you want to run too frequently. On the other hand, we do want to return the overhead storage to the system so it can be reused for more useful purposes. This operation should be cheap, so it can be executed more frequently, and the number of IO operations required to perform the operation should be as low as possible.

 

First, a little intermezzo (you can skip this if you know what a ‘sparse’ file is). Most “modern” filesystems, especially (but not only) in the UNIX world, have support for ‘sparse’ files. A sparse file is a file which contains empty regions, blocks which appear to be filled with zeros, but aren’t really mapped to blocks on the block device to which the filesystem writes its data. These empty regions can be added to the file when it’s being written (you can extend a file with such empty space), and data can be written to these empty regions later on (at this point in time, the blocks are mapped to physical blocks, of course).

These ’empty’ regions are taken into account when determining the size of the file, but they don’t occupy any space on the disk device. When reading from these regions, all you get are ‘zero’ bytes.

Here’s a simple demonstration, creating a large (2GB) sparse file containing no data at all, then displaying its apparent size as well as the actual size on disk:

[nicolas@tau ~]$ dd if=/dev/null of=demo.dat seek=$((2 * 1024 * 1024)) bs=1k
0+0 records in
0+0 records out
0 bytes (0 B) copied, 2.6617e-05 s, 0.0 kB/s
[nicolas@tau ~]$ ls -lh demo.dat
-rw-rw-r--. 1 nicolas nicolas 2.0G Dec 16 16:10 demo.dat
[nicolas@tau ~]$ du --block-size=1 demo.dat
0 demo.dat

This shows the size of the generated file appears to be 2.0GB, but it’s stored to disk using exactly 0 bytes (next to the inode/metadata size, of course).

During normal operation, one can create a sparse region in a file by using the ftruncate(2) system call. This only allows to append an empty region to the end of a file, not marking an existing region as being ’empty’.

 

Luckily, (very) recent Linux kernel versions allow this in a couple of file systems: support for the FALLOC_FL_PUNCH_HOLE option for the fallocate(2) call is available in at lease ext4, XFS and OCFS2. Using this option, an existing region in a file can be marked as ’empty’, even though it contained data before. The file system will change its mappings so the region becomes sparse, and zero out any ‘overflow’ bytes at the left or right end side of the region (since only blocks can be unmapped).

Thanks to this feature, we can implement cheap resource deallocation by punching holes in a database file for every region which is no longer of any use. Blocks will become available again, and the file system will handle defragmentation (if required), whilst all offsets encoded in the internal nodes will remain valid.

 

Now we know the mechanism to free unused resources, we need to find out which resources can be freed, i.e. calculate where we can punch holes in the database file. Initially, we thought about walking the tree, keeping track of the offset and size of all referenced entries as a list of integer pairs, then merging adjecent pairs, and finally punching holes at all areas of the file which aren’t part of this final map.

This approach has a major drawback: a lot of state is being accumulated throughout the traversal, which results in lots of memory usage: the number of entries required to represent a large database can be huge!

Thanks to a very basic observation, a more simple, efficient and elegant algorithm is available though! Here’s the deal: entries are added to the database file using append-only writes. Any entry can contain pointers to other entries, encoded as the offsets of these entries in the file. An entry can, at all times, only point to other entries that were stored before itself. As such, an entry can only point to entries on its ‘left’, at lower offsets, an entry can never reference another entry on its ‘right’.

This simplifies things a lot, and allows us to implement the following recursive algorithm: the state passed at every function call consists of only 2 values: a set of offsets (integers) S, and a single offset (integer) O.

In every iteration, the largest integer in set S is determined, and removed from the set. Let’s call this value C. This is the offset of the entry we’ll handle during this iteration. We read the entry from disk, and parse it. If it’s an entry which references other entries (i.e. a Leaf or Index entry), we add the offsets of every entry it references to set S. Do note, at all times, these offsets will be smaller than C (entries can only refer to other entries found earlier in the file!).

Next, we calculate the size L of the entry we’re dealing with, and calculate the offset of the end of the entry in the file, E = C + L.

At this point, we can safely punch a hole from E to O (read on to know what O represents).

Now we check whether S is empty. If it is, there are no more referenced entries below offset C. As such, we can punch a hole from the beginning of the file (or, in the Baardskeerder on-disk format, after the metadata blocks) up to C.

Otherwise, if S isn’t empty, a recursive call is made, passing the updated set and using C as value for O.

Finally, we need to initiate the operation. Here’s how: first, find the commit entry of the oldest version of the database to be retained (remember, Baardskeerder is a functional/persistent data structure!). Any older versions (with commit entries at lower offsets) will become invalid because of the compaction process. Now, the recursive process can be initiated using the singleton set containing the offset of the root entry to which the commit entry points as S, and the offset of the commit entry as value of O.

Note set S will, unlike the list in the initial approach, never be very big: it will, at all times, only contain the offset of all known entries at the left of the one we’re dealing with, it’ll never contain the offsets of all entries in the database.

Here’s a Haskell implementation of the algorithm, which might clarify some things:


import Prelude hiding (null)
import Data.ByteString (ByteString)
import Data.Int (Int32)
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import System.Posix (COff, Fd)

type Key = ByteString
type Value = ByteString

type Offset = COff
type Length = Int32

data Entry = Commit Offset Length Offset
           | Leaf Offset Length Value
           | Node Offset Length [(Key, Offset)]

getEntry :: Fd -> Offset -> IO Entry
getEntry f o = undefined

punch :: Fd -> Offset -> Length -> IO ()
punch f o l = undefined

data CompactState = CS Offset IntSet

fileStart :: Offset
fileStart = 4096

compact :: Fd -> Offset -> IO ()
compact f o = do
    c <- getEntry f o
    case c of
        Commit _ _ r -> compact' f $ CS o (IS.singleton $ fromIntegral r)
        otherwise -> error "compact: not a commit entry"

compact' :: Fd -> CompactState -> IO ()
compact' f (CS n os) = do
    -- At this stage, s should never be empty
    let (h, os') = IS.deleteFindMax os
        h' = fromIntegral h

    e <- getEntry f h'

    let (e', os'') = case e of
            Leaf o l _ -> (o + fromIntegral l, os')
            Node o l rs ->
                (o + fromIntegral l,
                    foldr (IS.insert . fromIntegral) os' $ map snd rs)
            otherwise -> error "compact': invalid entry type"

    punch f e' $ fromIntegral (n - e')

    if (IS.null os'')
    then punch f fileStart $ fromIntegral (h' - fileStart)
    else compact' f (CS h' os'')

To conclude, some related “real-world software engineering” notes:

  • Since only blocks can be unmapped, calls to “punch” should be block-aligned, otherwise overflow is filled with zeroes, which results in completely useless and unnecessary random write operations
  • One might want to use a threshold to punch only if the hole would be big enough (e.g. at least 10 consecutive blocks): creating lots of small holes can introduce unwanted fragmentation (this depends on the usage pattern of your database)
  • It’s possible to be “smart” and only punch holes at regions which are still backed by physical blocks (the algorithm doesn’t, and shouldn’t, take any existing holes into account!). There are 2 ways to request information about existing holes to the file system: using ioctl(2) with the FS_IOC_FIEMAP option, if the file system supports this, or using lseek(2) with SEEK_HOLE and SEEK_DATA (which also requires filesystem support to work correctly). No testing has been performed to check out whether this provides any benefits yet.

Guest lecture: “Real-World Functional Programming @ Incubaid”

The Incubaid Research team was invited by prof. dr. ir. Tom Schrijvers (University of Gent, UGent) to give a guest lecture about the industrial relevance of Functional Programming (FP), as part of his master course on “Functional and Logic Programming Languages” (which covers Haskell and Prolog).

The talk covered what we’re doing at Incubaid, why we are using FP (including advantages and disadvantages), as well as a short introduction to OCaml and a comparison to Haskell, taking the prior knowledge of the audience into account.

These are the slides used during the lecture [PDF]:


Hybrid sync & async Python request/response protocol client implementations

Arakoon, our in-house developed key-value store, is one of our flagship projects. Since a server isn’t of much use if no clients can talk to it, we also developed a couple of client libraries, including an OCaml, C, PHP and Python client.

Next to the Python client maintained inside the main Arakoon repository, an alternative client was developed as well (source, source). One of the goals of this alternative client was supporting the Twisted asynchronous networking framework.

In this post, I’ll present the approach taken to achieve this. It maintains a clear separation between the protocol (the bytes going over the wire) and the transport (the wire itself). Both synchronous as well as asynchronous transports can be implemented, and new request/response commands can be added easily, at a single place in the source code.

Throughout this post, we’ll write a client for this server, which implements a protocol similar to the Arakoon protocol:

import socket
import struct
import threading

HOST = 'localhost'
PORT = 8080

COMMAND_STRUCT = struct.Struct('<I')

SUCCESS_CODE = struct.pack('<I', 0)
ERROR_CODE = struct.pack('<I', 1)
ERROR_MESSAGE = 'Invalid request'
ERROR_MESSAGE_DATA = struct.pack('<I%ds' % len(ERROR_MESSAGE),
    len(ERROR_MESSAGE), ERROR_MESSAGE)

LEN_STRUCT = struct.Struct('<I')

def handle(conn):
    while True:
        command_data = ''
        while len(command_data) < COMMAND_STRUCT.size:
            data = conn.recv(COMMAND_STRUCT.size - len(command_data))

            if not data:
                return

            command_data += data

        command, = COMMAND_STRUCT.unpack(command_data)

        if command == 1:
            len_data = ''

            while len(len_data) < LEN_STRUCT.size:
                data = conn.recv(LEN_STRUCT.size - len(len_data))
                len_data += data

            len_, = LEN_STRUCT.unpack(len_data)

            data = ''
            while len(data) < len_:
                data += conn.recv(len_ - len(data))

            conn.send(SUCCESS_CODE)
            conn.send(struct.pack('<L%ds' % len(data), len(data), data[::-1]))
        else:
            conn.send(ERROR_CODE)
            conn.send(ERROR_MESSAGE_DATA)

def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((HOST, PORT))
    sock.listen(1)

    while True:
        conn, addr = sock.accept()
        print 'Connect: %r' % (addr, )

        threading.Thread(target=lambda: handle(conn)).start()

if __name__ == '__main__':
    main()

Every message sent by a client starts with a 32bit integer, the command identifier. Currently only one command, ‘reverse’ with ID 1, is implemented. This takes a single string as argument. Strings are encoded in two parts: first, a 32bit integer containing the length of the string, followed by the actual string data as characters.

When the server receives a command, it sends a 32bit integer denoting success (0x00) or failure (0x01). The ‘reverse’ command simply returns a the input string, reversed, using the same string encoding as described before. Once this cycle is complete, a new command can be sent by a client.

Now, on to the client side. We’ll need some imports:

import socket
import struct
import logging
import functools
import collections

from twisted.python import log
from twisted.internet import defer, protocol, reactor
from twisted.protocols import basic, stateful

import utils

The ‘utils’ module contains some helpers, and is contained in the Pyrakoon repository.

We need a way to communicate between the protocol layer and the transport layer. Only the protocol side knows how much data is expected, and only the transport side can provide these bytes, even though it might not have the required amount of data available immediately, and want to yield execution (in case of asynchronous networking). To make development within these constraints easier, coroutines are used throughout the system to encapsulate intermediate state whenever possible, whilst maintaining a simple API.

Here’s how the API works: a single protocol action (e.g. reading the response of a request) is backed by a coroutine, which yields one or more ‘Request’ objects, which encapsulate the number of bytes that should be provided to the coroutine for the protocol to be able to construct a value, or a ‘Result’ object which encapsulate the final value. Whenever the upper layer receives a ‘Request’ object, it should read the requested number of bytes from the transport, then ‘send’ the data into the coroutine, which will yield another ‘Request’, or finally  a ‘Response’.

The definitions are very simple:

class Request(object):
    def __init__(self, count):
        self.count = count

class Result(object):
    def __init__(self, value):
        self.value = value

Next, the protocol uses a couple of different types of values: 32bit (unsigned) integers, and strings. The latter uses the first in its internal encoding.

Every type has 3 methods: ‘check’, ‘serialize’ and ‘receive’.

‘check’ performs input validation for values of the given type (type check, boundary check,…). ‘serialize’ is a generator which yields the encoded data for a given value. ‘receive’ is a coroutine which yield ‘Request’ and ‘Result’ objects to receive a value of the type.

For basic types (e.g. integers) packed values using the ‘struct’ module can be used, so the base implementations provides the required functionality for this. Here’s the base type, and implementations for both 32bit unsigned integers and strings:

class Type(object):
    PACKER = None

    def check(self, value):
        raise NotImplementedError

    def serialize(self, value):
        if not self.PACKER:
            raise NotImplementedError

        yield self.PACKER.pack(value)

    def receive(self):
        if not self.PACKER:
            raise NotImplementedError

        data = yield Request(self.PACKER.size)
        result, = self.PACKER.unpack(data)

        yield Result(result)

class UnsignedInt32(Type):
    PACKER = struct.Struct('<I')
    MAX_INT = (2 ** 32) - 1

    def check(self, value):
        if not isinstance(value, (int, long)):
            raise TypeError

        if value < 0:
            raise ValueError('Unsigned integer expected')

        if value > self.MAX_INT:
            raise ValueError('Integer overflow')

UNSIGNED_INT32 = UnsignedInt32()

class String(Type):
    def check(self, value):
        if not isinstance(value, str):
            raise TypeError

    def serialize(self, value):
        length = len(value)

        for bytes_ in UNSIGNED_INT32.serialize(length):
            yield bytes_

        yield struct.pack('<%ds' % length, value)

    def receive(self):
        length_receiver = UNSIGNED_INT32.receive()
        request = length_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = length_receiver.send(value)

        if not isinstance(request, Result):
            raise TypeError

        length = request.value

        if length == 0:
            result = ''
        else:
            data = yield Request(length)
            result, = struct.unpack('<%ds' % length, data)

        yield Result(result)

STRING = String()

Now the basic types are defined, we can describe the request/response messages transferred between client and server. Every message has a tag (its identifier), zero or more arguments, and a return type. Messages can be serialized and received similar to the corresponding methods on ‘Type’. Most, if not all necessary plumbing can be hidden inside the ‘Message’ class, so command-specific classes can be very short and simple. This makes it easy to add new protocol commands to the client as well!

Here’s the ‘Message’ definition, as well as the implementation of our ‘Reverse’ command. Note how simple the definition of the latter is.

class Message(object):
    TAG = None
    ARGS = None
    RETURN_TYPE = None

    def serialize(self):
        for bytes_ in UNSIGNED_INT32.serialize(self.TAG):
            yield bytes_

        for arg in self.ARGS:
            name, type_ = arg

            for bytes_ in type_.serialize(getattr(self, name)):
                yield bytes_

    def receive(self):
        code_receiver = UNSIGNED_INT32.receive()
        request = code_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = code_receiver.send(value)

        if not isinstance(request, Result):
            yield TypeError

        code = request.value

        if code == 0x00:
            result_receiver = self.RETURN_TYPE.receive()
        else:
            result_receiver = STRING.receive()

        request = result_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = result_receiver.send(value)

        if not isinstance(request, Result):
            raise TypeError

        result = request.value

        if code == 0x00:
            yield Result(result)
        else:
            raise Exception('Error %d: %s' % (code, result))

class Reverse(Message):
    TAG = 0x01
    ARGS = ('text', STRING),
    RETURN_TYPE = STRING

    def __init__(self, text):
        super(Reverse, self).__init__()

        self.text = text

Next up, we’ll write the base class for all actual client implementations. Some dynamic method construction is used on the go, based on the following 2 utility functions:

def validate_types(specs, args):
    for spec, arg in zip(specs, args):
        name, type_ = spec[:2]

        try:
            type_.check(arg)
        except TypeError:
            raise TypeError('Invalid type of argument "%s"' % name)
        except ValueError:
            raise ValueError('Invalid value of argument "%s"' % name)

def call(message_type):
    def wrapper(fun):
        argspec = ['self']
        for arg in message_type.ARGS:
            argspec.append(arg[0])

        @utils.update_argspec(*argspec)
        @functools.wraps(fun)
        def wrapped(**kwargs):
            self = kwargs['self']

            if not self.connected:
                raise RuntimeError('Not connected')

            args = tuple(kwargs[arg[0]] for arg in message_type.ARGS)
            validate_types(message_type.ARGS, args)

            message = message_type(*args)

            return self._process(message)

        return wrapped

    return wrapper

The ‘Client’ base class becomes extremely simple. Whenever a new command is added to the protocol, adding it to this class (as done for the ‘reverse’ call) is obvious.

class Client(object):
    connected = False

    @call(Reverse)
    def reverse(self):
        assert False

    def _process(self, message):
        raise NotImplementedError

That’s about all there is. What’s left is transport-specific client implementations.

Starting with a synchronous socket client is the easiest. All we need is a ‘connect’ method to set up a socket, and implement ‘Client._process’ to handle interaction between the protocol and the transport. The implementation is pretty straight-forward:

class SyncClient(Client):
    def __init__(self):
        self._socket = None

    def connect(self, addr, port):
        self._socket = socket.create_connection((addr, port))

    @property
    def connected(self):
        return self._socket is not None

    def _process(self, message):
        try:
            for part in message.serialize():
                self._socket.sendall(part)

            receiver = message.receive()
            request = receiver.next()

            while isinstance(request, Request):
                data = ''

                while len(data) < request.count:
                    d = self._socket.recv(request.count - len(data))
                    if not d:
                        raise Exception

                    data += d

                request = receiver.send(data)

            if not isinstance(request, Result):
                raise TypeError

            utils.kill_coroutine(receiver, logging.exception)

            return request.value
        except Exception:
            try:
                self._socket.close()
            finally:
                self._socket = None

            raise

The Twisted protocol is somewhat more complex, and won’t be covered in detail in this post. If you ever wrote a Twisted protocol yourself, it should be easy to follow though. The implementation piggy-backs on ‘twisted.protocol.stateful.StatefulProtocol’, which simplifies a lot.

class TwistedProtocol(Client, stateful.StatefulProtocol, basic._PauseableMixin):
    _INITIAL_REQUEST_SIZE = UNSIGNED_INT32.PACKER.size

    def __init__(self):
        Client.__init__(self)

        self._handlers = collections.deque()
        self._currentHandler = None

        self._connected = False
        self._deferredLock = defer.DeferredLock()

    def _process(self, message):
        deferred = defer.Deferred()
        self._handlers.append((message.receive(), deferred))

        def process(_):
            try:
                for data in message.serialize():
                    self.transport.write(data)
            finally:
                self._deferredLock.release()

        self._deferredLock.acquire().addCallback(process)

        return deferred

    def getInitialState(self):
        self._currentHandler = None

        return self._responseCodeReceived, self._INITIAL_REQUEST_SIZE

    def _responseCodeReceived(self, data):
        self._currentHandler = None

        try:
            self._currentHandler = handler = self._handlers.pop()
        except IndexError:
            log.msg('Request data received but no handler registered')
            self.transport.loseConnection()

            return None

        request = handler[0].next()

        if isinstance(request, Result):
            return self._handleResult(request)
        elif isinstance(request, Request):
            if request.count != self._INITIAL_REQUEST_SIZE:
                handler[1].errback(ValueError('Unexpected request count'))
                self.transport.loseConnection()

                return None

            return self._handleRequest(data)
        else:
            log.err(TypeError,
                'Received unknown type from message parsing coroutine')
            handler[1].errback(TypeError)

            self.transport.loseConnection()

            return None

    def _handleRequest(self, data):
        if not self._currentHandler:
            log.msg('Request data received but no handler registered')
            self.transport.loseConnection()

            return None

        receiver, deferred = self._currentHandler

        try:
            request = receiver.send(data)
        except Exception, exc: #pylint: disable-msg=W0703
            log.err(exc, 'Exception raised by message receive loop')
            deferred.errback(exc)

            return self.getInitialState()

        if isinstance(request, Result):
            return self._handleResult(request)
        elif isinstance(request, Request):
            return self._handleRequest, request.count
        else:
            log.err(TypeError,
                'Received unknown type from message parsing coroutine')
            deferred.errback(TypeError)

            self.transport.loseConnection()

            return None

    def _handleResult(self, result):
        receiver, deferred = self._currentHandler
        self._currentHandler = None

        # To be on the safe side...
        utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))

        deferred.callback(result.value)

        return self.getInitialState()

    def connectionLost(self, reason=protocol.connectionDone):
        self._connected = False

        self._cancelHandlers(reason)

        return stateful.StatefulProtocol.connectionLost(self, reason)

    def _cancelHandlers(self, reason):
        while self._handlers:
            receiver, deferred = self._handlers.popleft()

            utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))

            deferred.errback(reason)

 

That’s it! Finally, we can test our clients against the server:

HOST = 'localhost'
PORT = 8080

def test_sync():
    client = SyncClient()
    client.connect(HOST, PORT)
    r = client.reverse('sync')
    print 'sync =>', r
    print r, '=>', client.reverse(r)

def test_twisted():
    def create_client(host, port):
        client = protocol.ClientCreator(reactor, TwistedProtocol)
        return client.connectTCP(host, port)

    @defer.inlineCallbacks
    def run(proto):
        result = yield proto.reverse('twisted')
        print 'twisted =>', result
        result2 = yield proto.reverse(result)
        print result2, '=>', result
        proto.transport.loseConnection()

    deferred = create_client(HOST, PORT)
    deferred.addCallback(run)
    deferred.addBoth(lambda _: reactor.stop())

    reactor.run()

if __name__ == '__main__':
    test_sync()
    test_twisted()

 

If for example an ‘add’ method is added to the server, which returns the sum of two given 32bit unsigned integers, we could define a new command like this:

class Add(Message):
    TAG = 0x02
    ARGS = ('a', UNSIGNED_INT32), ('b', UNSIGNED_INT32),
    RETURN_TYPE = UNSIGNED_INT32

    def __init__(self, a, b):
        super(Add, self).__init__()

        self.a = a
        self.b = b

Next, add it to the ‘Client’ class like this:

@call(Add)
def add(self):
    assert False

Once this is done, the ‘add(self, a, b)’ method will be available on all clients and work as expected!

 

This is just a basic example. The Arakoon protocol contains more complex types as well, including ‘option’ types and lists. See the Pyrakoon source-code to see how this is handled. Only a type definition should be added, multiple commands can use them as-is easily.

Using the approach described in this post, it becomes easy to provide client implementations using several different backends (blocking, non-blocking, sockets or anything else as transport,…), and simplify adding new commands/calls to all clients at once (keeping them in sync). This simplifies client maintenance a lot.


On segmentation faults, stack overflows, GDB and OCaml

A couple of months ago, we got reports of random crashes of our distributed, consistent key-value store, Arakoon (if you don’t know it, go check it out!). The process died, out of the blue, due to a segmentation fault. These crashes seemed to occur at random, and we found no way to reproduce them. All we got was a couple of core dumps.

Arakoon is written in OCaml, and uses the Lwt library for lightweight threads (providing concurrency support).

There are a couple of reasons causing segmentation faults: writing to or reading from unmapped memory, writing to a read-only mapping, etc. When loading the failing binary and the coredump in the GNU Debugger, GDB, a sensible backtrace could not be created. Sadly enough I no longer have the ‘bt’ output around.

The displayed backtrace didn’t make sense though. Apparently, GDB was unable to construct a correct backtrace. After some more investigation and head-scratching, we noticed the value of the stack pointer, stored in the ‘rsp’ register, was just under a 4 kilobyte boundary. Looking into the code around the instruction pointer (the ‘rip’ register) at the moment of the crash, our suspicions were confirmed: a value was written on the stack, just under this 4k limit.

On Intel x86 systems (like the ones we’re using, x86–64), the stack is, basically, a region of memory similar to the heap, which grows towards lower addresses (to the bottom). The crashes we were investigating were caused by a so-called stack overflow: the process tried to write at an address below the mapped pages of the stack!

Do note, when using OCaml, there’s only 2 reasons for segfaults, unless there’s some unlikely bug in the compiler/code generator: stack overflows, or whatever happens when calling out to C code (in which case the issue isn’t related to OCaml at all).

To generate a useful backtrace, required to be able to pinpoint the issue as well as fix it, we had to revert to other means. By inspecting the assembly of the crashing code, as well as the contents of the process memory right above the 4k-aligned page on top of the stack pointer, we were able to reason about what happened at runtime, and calculate valid values for both the stack pointer as well as the instruction pointer, i.e. the values of the rsp and rip registers valid in the frame before the one in which the segmentation fault occurred.

After we figured out these 2 values, we replaced the faulty ones in the binary core dump file using a hex editor, taking the machine byte order into account (i.e. when replacing 0x00007fff5ed6b000, one should search for the sequence ‘0x00 0xb0 0xd6 0x5e 0xff 0x7f 0x00 0x00’!). Once the core dump was ‘fixed’, we could load it into GDB, and a valid backtrace could be generated, which confirmed our suspicions (there were more than 1.3 million frames on the stack!), and it was easy to extract a recurring sequence of frames, here it is:

#1309851 0x00007f8b7e936d40 in ?? ()
#1309852 0x00000000004c38bd in camlLwt__fun_838 ()
#1309853 0x000000001d457348 in ?? ()
#1309854 0x00000000004c2a3d in camlLwt__fun_724 ()
#1309855 0x00007fffb67bd570 in ?? ()
#1309856 0x00000000004c2a21 in camlLwt__fun_724 ()
#1309857 0x00000000028a2bf8 in ?? ()
#1309858 0x00000000004c3cd0 in camlLwt__run_waiters_rec_207 ()
#1309859 0x000000001d457330 in ?? ()
#1309860 0x000000001d457320 in ?? ()
#1309861 0x00000000007d8dd0 in camlLwt__77 ()
#1309862 0x00000000004c3d7e in camlLwt__run_waiters_231 ()
#1309863 0x0000000000000001 in ?? ()
#1309864 0x00000000004c65fd in camlLwt_mutex__fun_112 ()

As you can see, the OCaml compiler uses a name mangling scheme to generate symbol names. Some are obvious (‘camlLwt__run_waiters_rec’ is ‘Lwt.run_waiters_rec’), some are not: all ‘*__fun_*’ symbols are due to anonymous closures. Using the assembly code of these functions, alongside the Lwt source code, we were able to reconstruct what was going on.

Finally we figured out what caused the stack overflows we were experiencing, and could create 2 small testcases, exposing the issue at a more manageable scale.

A work-around was created, and we reported the issue to the Lwt developers, who fixed the issue some days later, check the thread on their mailing list here.

 

A couple of days ago, we got a new report of segmentation faults occurring. We started to investigate the core dump using GDB yet again, noticed a sensible backtrace could not be generated, and attempted to alter the coredump again, just like some months ago. Here’s what GDB told us:

#0 0x00000000005617c2 in camlLwt__try_bind_1339 ()
(gdb) bt
#0 0x00000000005617c2 in camlLwt__try_bind_1339 ()
#1 0x00007fff5ed6b040 in ?? ()
#2 0x00000000005617a6 in camlLwt__try_bind_1339 ()
#3 0x0000000000000000 in ?? ()
(gdb) p/x $rsp
$1 = 0x7fff5ed6b000
(gdb) p/x $rip
$2 = 0x5617c2

As you can see, the stack pointer is at a 4k boundary again. By reading the disassembly of the failing function, we can take a look at the stack data:

(gdb) disassemble $rip
Dump of assembler code for function camlLwt__try_bind_1339:
   0x0000000000561790 :	sub    $0x28,%rsp
   0x0000000000561794 :	mov    %rax,%rsi
   0x0000000000561797 :	mov    %rbx,0x18(%rsp)
   0x000000000056179c :	mov    %rdi,0x10(%rsp)
   0x00000000005617a1 :	callq  0x5617b0 
   0x00000000005617a6 :	callq  0x560d70 
   0x00000000005617ab :	jmp    0x5617ca 
   0x00000000005617ad :	nopl   (%rax)
   0x00000000005617b0 :	push   %r14
   0x00000000005617b2 :	mov    %rsp,%r14
   0x00000000005617b5 :	mov    $0x1,%rax
   0x00000000005617bc :	mov    (%rsi),%rdi
   0x00000000005617bf :	mov    %rsi,%rbx
=> 0x00000000005617c2 :	callq  *%rdi
   0x00000000005617c4 :	pop    %r14
   0x00000000005617c6 :	add    $0x8,%rsp
   0x00000000005617ca :	callq  0x560630 

The function substracts 0x28 from ‘rsp’ at entry, so the previous frame uses ‘rsp + 0x28’ as stack pointer, which we can print:

(gdb) x/64 $rsp + 0x28
0x7fff5ed6b028: 0x58002ef0 0x00007f08 0x0774cd58 0x00000000
0x7fff5ed6b038: 0x0055eda5 0x00000000 0x5ed6b0b0 0x00007fff
0x7fff5ed6b048: 0x0055ed87 0x00000000 0x0774cd58 0x00000000
0x7fff5ed6b058: 0x00560785 0x00000000 0x0774cd48 0x00000000
0x7fff5ed6b068: 0x00000001 0x00000000 0x58002f68 0x00007f08
0x7fff5ed6b078: 0x0056080e 0x00000000 0x00000001 0x00000000
0x7fff5ed6b088: 0x00563cdd 0x00000000 0x58002fb8 0x00007f08
0x7fff5ed6b098: 0x0056006d 0x00000000 0x58002f78 0x00007f08
0x7fff5ed6b0a8: 0x0055eda5 0x00000000 0x5ed6b120 0x00007fff
0x7fff5ed6b0b8: 0x0055ed87 0x00000000 0x0774cb30 0x00000000
0x7fff5ed6b0c8: 0x00560785 0x00000000 0x0774cb20 0x00000000
0x7fff5ed6b0d8: 0x00000001 0x00000000 0x58003050 0x00007f08
0x7fff5ed6b0e8: 0x0056080e 0x00000000 0x00000001 0x00000000
0x7fff5ed6b0f8: 0x00563cdd 0x00000000 0x580030a0 0x00007f08
0x7fff5ed6b108: 0x0056006d 0x00000000 0x58003060 0x00007f08
0x7fff5ed6b118: 0x0055eda5 0x00000000 0x5ed6b190 0x00007fff

If you take a close look, you can see a recurring pattern starting at 0x7fff5ed6b038, at least partially: these are return addresses of functions part of the recursion loop, and their (varying) arguments. Using some calls to ‘info symbol’ and providing the returning addresses, the names of the functions can be retrieved. There was a lot of resemblance with what we saw before as well…

We dug up the testcases we used before, and compiled them using Lwt 2.3.0, the version against which the crashing Arakoon binary was compiled as well, and our fear was confirmed: the same issue was introduced in Lwt again, a regression! Luckily the issue was fixed later on again: compiling the tests using Lwt 2.3.2 confirmed this. We had builds of Arakoon against Lwt 2.3.2 around, so production systems could be upgraded to a safe version again. I guess we should add the Lwt testcases to the Arakoon test-suite.

 

I read about the Python scripting support introduced in recent versions of GDB a couple of weeks ago, and decided to automate the debugging of this issue, if possible. As such I created a script which implements a new GDB command, ‘ocaml-detect-recursion’, which takes an optional stack address (it defaults to ‘rsp’), reads some stack memory, and attempts to find return addresses in these bytes. The buffer is scanned by reading 8 bytes at a time (we’re on a 64bit system, which uses 8 byte addresses), then checking whether this value is part of a known function. If it is, it’s appended to a list.

Once this is done, a minimal recurring pattern is detected, if possible. The algorithm used here is extremely simple at this moment (split the address list in n-grams for increasing values of n, then checking whether all n-grams are identical), so this might use some love.

Once the recurring pattern is detected, a summary, including the original function names, is displayed:

(gdb) source ocaml_detect_recursion.py
(gdb) ocaml-detect-recursion
Recurring call pattern starting at frame 1
==========================================
camlLwt__fun_1826 @ 0x55eda5
camlLwt__fun_1826 @ 0x55ed87
camlLwt__run_waiters_rec_1181 @ 0x560785
camlLwt__run_waiters_1199 @ 0x56080e
camlLwt_mutex__fun_1084 @ 0x563cdd
camlLwt__fun_1956 @ 0x56006d

Using this plugin, it becomes really easy to debug this issue if it ever occurs again in the future.

Scripting GDB takes some time (go check the docs!), but I do think it’s worth it. Another interesting feature is custom pretty-printers (one for OCaml values might be useful, *hint hint*).

To conclude, you can find the GDB script here and here. Feel free to fork and enhance it, or let us know if you were able to debug an issue using it!

Finally, thanks to our colleagues at Amplidata for their support debugging the crash the first time.

Interested in working on issues like this yourself? Take a look at our jobs page, we’re looking for colleagues.