User functions in Arakoon

Mahomet cald the Hill to come to him. And when the Hill stood still, he was neuer a whit abashed, but said;
If the Hill will not come to Mahomet, Mahomet wil go to the hill.

Francis Bacon

Introduction

Arakoon tries to be a simple distributed key value store that favours consistency over availability.
From time to time, we get feature requests for additional commands like:

  • assert_exists: assert a value exists for the key without caring what the value actually is
  • increment_counter: increment (or create) a counter and return the new value.
  • queue operations : add an element to the front/back of a double ended queue or pop an element
  • set_and_update_X: insert a key value pair and update some non-trivial counter X (think averages, variances,…)

The list is semi-infinite and the common thing here is that they are too complex/specific/weird/… to do them in one step using the provided interface. Of course, you can do all of this on the client side, but it will cost extra network round-trips. In distributed systems, you really want to keep the number of round-trips low, which pushes you towards these kind of feature requests.

Once you decided (performance bottlenecks probably) that you need extra functionality there are two things you can do. First, you can try to force or entice us into adding them to the core interface or alternatively, you can get by using Arakoon’s “user functions”. For some reason people fear them but there’s no real technical reason to do so.

This blog post will cover two things. First we’ll go in to the nitty gritty of coding and deploying user functions and then we’ll look at some of the strategic/architectural challenges of user functions.

How do user functions work?

The high level view is this: you build a user function, and register it to an Arakoon cluster before you start it. Then, at runtime, you can call it, using any client, with a parameter (a string option) and get back a result (string option). On the server side, the master will log this in its transaction log, try to reach consensus with the slave(s) and once that is the case, the user function will be executed inside a transaction. The result of that call will be sent to the client. If an exception occurs, the transaction will be aborted. Since Arakoon logs transactions it can replay them in case of calamities. This has a very important impact: since Arakoon needs to be able to replay the execution of a user function, you cannot do side effects, use random values or read the system clock.

Running Example

We’re going to try to build a simple queue API.
It will offer named queues with 2 operations: push and pop. Also, it’s a first-in-first-out thingy.

Arakoon 1

Client side API

Arakoon 1 offers the following API for user functions.

def userFunction(self, name, argument):
'''Call a user-defined function on the server
@param name: Name of user function
@type name: string
@param argument: Optional function argument
@type argument: string option

@return: Function result
@rtype: string option
'''

Let’s take a look at it. A userFunction call needs the name, which is a string, and an argument which is a string option and returns a result of type string option. So what exactly is a string option in Python? Well, it’s either a string or None. This allows a user function to not take input or to not yield a result.

Server side API

The server side API is in OCaml, and looks like this:


class type user_db =
object
method set : string -> string -> unit
method get : string -> string
method delete: string -> unit
method test_and_set: string -> string option -> string option -> string option
method range_entries: string option -> bool -> string option -> bool -> int
-> (string * string) list
end

User functions on server side match the client’s opaque signature.

user_db -> string option -> string option

Queue’s client side

Let’s create the client side in python. We’ll create a class that uses an Arakoon client and acts as a queue. The problem with push is that we need to fit both the name and the value into the one paramater we have available. We need to do our own serialization. Let’s just be lazy (smart?) and use Arakoon’s serialization. The code is shown below.

from arakoon import Arakoon
from arakoon import ArakoonProtocol as P

class ArakoonQueue:
    def __init__(self, name, client):
        self._name = name
        self._client = client

    def push(self, value):        
        input =   P._packString(self._name) 
        input +=  P._packString(value)
        self._client.userFunction("QDemo.push", input)

    def pop(self):
        value = self._client.userFunction("QDemo.pop", self._name)
        return value



That wasn’t too hard now was it?

Queue, server side

The whole idea is that the operations happen on server side, so this will be a tat more complex.
We need to model a queue using a key value store. Code-wise, that’s not too difficult.
For each queue, we’ll keep 2 counters that keep track of both ends of the queue.

A push is merely getting the qname and the value out of the input, calculating the place where we need to store it, store the value there and update the counter for the back end of the queue. A pop is similar but when the queue becomes empty, we use the opportunity to reset the counters (maybe_reset_counters). The counter representation is a bit weird but Arakoon stores things in lexicographical order and we want to take advantage of this to keep our queue fifo. Hence, we need to make the counter in such a way the counter’s order is the same as a string’s order. The code is shown below.

(* file: plugin_qdemo.ml *)

open Registry 

let zero = ""
let begin_name qname = qname ^ "/@begin" 
let end_name qname = qname ^ "/@end"
let qprefix qname key = qname ^ "/" ^ key

let next_counter = function
  | "" -> "A"
  | s -> 
      begin
        let length = String.length s in
        let last = length - 1 in
        let c = s.[last] in
        if c = 'H' 
        then s ^ "A"
        else let () = s.[last] <- Char.chr(Char.code c + 1) in 
             s
      end

let log x= 
  let k s = let s' = "[plugin_qdemo]:" ^ s in
            Lwt.ignore_result (Lwt_log.debug s')
  in
  Printf.ksprintf k x

let maybe_reset_counters user_db qname b1 = 
  let e_key = end_name qname in
  let exists = 
    try let _ = user_db # get e_key in true with Not_found -> false 
  in
  if exists
  then 
    let ev = user_db # get e_key in
    if ev = b1 then
      let b_key = begin_name qname in
      let () = user_db # set b_key zero in
      let () = user_db # set e_key zero in
      ()
    else
      ()
  else ()

let push user_db vo = 
  match vo with
    | None -> invalid_arg "push None"
    | Some v -> 
        let qname, p1 = Llio.string_from v 0 in
        let value, _ = Llio.string_from v p1 in
        let e_key = end_name qname in
        let b0 = 
          try user_db # get (end_name qname) 
          with Not_found -> zero 
        in
        let b1 = next_counter b0 in
        let () = user_db # set (qprefix qname b1) value in
        let () = user_db # set e_key b1 in
        None

let pop user_db vo =
  match vo with 
    | None   -> invalid_arg "pop None"
    | Some qname -> 
        let b_key = begin_name qname in
        let b0 = 
          try user_db # get (begin_name qname) 
          with Not_found -> zero
        in
        let b1 = next_counter b0 in
        try 
          let k = qprefix qname b1 in
          let v = user_db # get k in 
          let () = user_db # set b_key b1 in
          let () = user_db # delete k in
          let () = maybe_reset_counters user_db qname b1 in
          Some v
        with
          Not_found ->
            let e_key = end_name qname in
            let () = user_db # set b_key zero in
            let () = user_db # set e_key zero in
            None
              

let () = Registry.register "QDemo.push" push
let () = Registry.register "QDemo.pop" pop

The last two lines register the functions to the Arakoon cluster when the module is loaded.

Compilation

So how do you deploy your user function module into an Arakoon cluster?
First need to compile your module into something that can be dynamically loaded.
To compile the plugin_qdemo.ml I persuade ocamlbuild like this:

ocamlbuild -use-ocamlfind -tag 'package(arakoon_client)' \
-cflag -thread -lflag -thread \
plugin_qdemo.cmxs

It’s not too difficult to write your own testcase for your functionality, so you can run it outside of Arakoon and concentrate on getting the code right.

Deployment

First, you need put your compilation unit into the Arakoon home directory on all your nodes of the cluster. And second, you need to add the name to the global section of your cluster configuration. Below, I show the configuration file for my simple, single node cluster called ricky.


[global]
cluster = arakoon_0
cluster_id = ricky

### THIS REGISTERS THE USER FUNCTION:
plugins = plugin_qdemo

[arakoon_0]
ip = 127.0.0.1
client_port = 4000
messaging_port = 4010
home = /tmp/arakoon/arakoon_0

All right, that’s it. Just a big warning about user functions here.

Once a user function is installed, it needs to remain available, with the same functionality for as long as user function calls are stored inside the transaction logs, as they need to be re-evaluated when one replays a transaction log to a store (for example when a node crashed, leaving a corrupt database behind). It’s not a bad idea to include a version in the name of a user function to cater for evolution.

Demo

Let’s use it in a simple python script.

def make_client():
    clusterId = 'ricky'
    config = Arakoon.ArakoonClientConfig(clusterId,
                                         {"arakoon_0":("127.0.0.1", 4000)})
    client = Arakoon.ArakoonClient(config)
    return client

if __name__ == '__main__':
    client = make_client()
    q = ArakoonQueue("qdemo", client)
    q.push("bla bla bla")
    q.push("some more bla")
    q.push("3")
    q.push("4")
    q.push("5")
    print q.pop()
    print q.pop()
    print q.pop()
    print q.pop()

with expected results.

Arakoon 2

With Arakoon 2 we moved to Baardskeerder as a database backend, replacing the combination of transaction logs and Tokyo Cabinet. Since the backend is Lwt-aware, this means that the server side API has become too:

module UserDB :
sig
type tx = Core.BS.tx
type k = string
type v = string
val set    : tx -> k -> v -> unit Lwt.t
val get    : tx -> k -> (v, k) Baardskeerder.result Lwt.t
val delete : tx -> k -> (unit, Baardskeerder.k) Baardskeerder.result Lwt.t
end

module Registry:
sig
type f = UserDB.tx -> string option -> (string option) Lwt.t
val register: string -> f -> unit
val lookup: string -> f
end

The major changes are that

  • the api now uses Lwt
  • we have (‘a,’b) Baardskeerder.result types, which we favour over the use of exceptions for normal cases.

Rewriting the queue implementation to Arakoon 2 yields something like:

(* file: plugin_qdemo2.ml *)

open Userdb
open Lwt
open Baardskeerder

let zero = ""
let begin_name qname = qname ^ "/@begin" 
let end_name qname = qname ^ "/@end"
let qprefix qname key = qname ^ "/" ^ key

let next_counter = function
  | "" -> "A"
  | s -> 
      begin
        let length = String.length s in
        let last = length - 1 in
        let c = s.[last] in
        if c = 'H' 
        then s ^ "A"
        else let () = s.[last] <- Char.chr(Char.code c + 1) in 
             s
      end

let reset_counters tx qname =
  let b_key = begin_name qname in
  let e_key = end_name qname in
  UserDB.set tx b_key zero >>= fun () ->
  UserDB.set tx e_key zero


let maybe_reset_counters tx qname (b1:string) = 
  let e_key = end_name qname in
  begin
    UserDB.get tx e_key >>= function
      | OK  _ -> Lwt.return true
      | NOK _ -> Lwt.return false
  end >>= function
    | true ->
        begin
          UserDB.get tx e_key >>= function
            | OK ev ->
                if ev = b1 
                then reset_counters tx qname
                else Lwt.return ()
            | NOK _  -> Lwt.return ()
        end
    | false  -> Lwt.return ()

let push tx vo = 
  match vo with
    | None -> Lwt.fail (invalid_arg "push None")
    | Some v -> 
        let qname, p1 = Llio.string_from v 0 in
        let value, _ = Llio.string_from v p1 in
        Lwt_log.debug_f "push:qname=%S;value=%S" qname value >>= fun ()->
        let e_key = end_name qname in
        UserDB.get tx (end_name qname) >>= fun b0r ->
        let b0 = match b0r with
          | OK b0 -> b0
          | _     -> zero
        in
        let b1 = next_counter b0 in
        UserDB.set tx (qprefix qname b1) value >>= fun () ->
        UserDB.set tx e_key b1 >>= fun () ->
        Lwt.return None



let pop tx = function
  | None   -> Lwt.fail (invalid_arg "pop None")
  | Some qname -> 
      begin
        let b_key = begin_name qname in
        UserDB.get tx (begin_name qname) >>= fun b0r ->
        begin
          match b0r with
            | OK b0 -> Lwt.return b0
            | NOK _ -> Lwt.return zero
        end
        >>= fun b0 ->
        let b1 = next_counter b0 in
        let k = qprefix qname b1 in
        UserDB.get tx k >>= fun vr ->
        begin
          match vr with
            | OK value -> 
                begin
                  UserDB.set tx b_key b1 >>= fun () ->
                  UserDB.delete tx k >>= function 
                    | OK () ->
                        begin
                          maybe_reset_counters tx qname b1 >>= fun () ->
                          Lwt.return (Some value)
                        end
                    | NOK e -> Lwt.fail (Failure e)
                end
             | NOK _  -> 
                 reset_counters tx qname >>= fun () ->
                 Lwt.return None
        end
      end
        
let () = Userdb.Registry.register "QDemo.push" push
let () = Userdb.Registry.register "QDemo.pop" pop

Both client side and deployment remain the same.

Questions asked

Ain’t there something wrong with this Queue?

Yes! Glad you noticed. This queue concept is fundamentally broken. The problem is the pop.
Follow this scenario:

  1. the client calls the QDemo.pop function
  2. the cluster pops the value from the queue and its master sends it to the client.
  3. the client dies before it can read the popped value

Now what? We’ve lost that value. Bloody network, how dare you!

Ok, I admit this was naughty, but it’s a good example of a simple local concept that doesn’t really amount to the same thing when tried in a distributed context. When confronted with this hole, people immediately try to fix this with “Right!, so we need an extra call to …”. To which I note: “But wasn’t this extra call just the thing you were trying to avoid in the first place?”

Why don’t you allow user functions to be written in <INSERT YOUR FAVOURITE LANGUAGE HERE>?

This is a good question, and there are several answers, most of them wrong. For example, anything along the lines of “I don’t like your stinkin’ language” needs to be rejected because a language’s cuteness is irrelevant.

There are several difficulties with the idea of offering user functions to be written in another programming language. For scripting languages like Python, Lua, PHP ,… we can either implement our own interpreter and offer a subset of the language, which is a lot of work with low return on investment, or integrate an existing interpreter/runtime which will probably not play nice with Lwt, or with the OCaml runtime (garbage collector). For compiled languages we might go via the ffi but it’s still way more complex for us. So for now you’re stuck with OCaml for user functions. There are worse languages.

Wouldn’t it be better if you apply the result of the user function to the transaction log iso the arguments?

Well, we’ve been thinking about that a lot before we started with user functions. The alternative is that we record and log the effect of the user function so that we can always replay that effect later, even when the code is no longer available. It’s an intriguing alternative, but it’s not a clear improvement. It all depends on the size of the arguments versus the size of the effect.
Some user functions have a small argument set and a big effect, while for other user functions it’s the other way around.

Closing words

Technically, it’s not too difficult to hook in your own functionality into Arakoon. Just make sure the thing you want to hook in does not have major flaws.

have fun,

Romain.

Advertisements

Announcing Baardskeerder

We’re happy to announce the public availability of Baardskeerder, our implementation of a Copy-On-Write (append-only) B-tree-ish embedded database, after approval by our CEO. The source is available on GitHub as part of the Incubaid organization at https://github.com/Incubaid/baardskeerder, under the LGPL-3 license.

This is a technology preview. Baardskeerder is still under heavy development, and breaking changes will occur before a first stable release is made.

After experiencing performance issues when storing “large” (> 8kb) values in TokyoCabinet, the storage back-end used by our distributed key-value store Arakoon, the Baardskeerder project was initiated. Work has been started to integrate Baardskeerder in Arakoon as well.

If you’re interested in the name of the project, take a look at Wikipedia.


Hole-punching compaction of an append-only database

The Baardskeerder project, as explained before, is an implementation of a B-tree-ish data structure, using an append-only file to persist the data to a storage device (e.g. a hard drive or SSD device). Not overwriting data has several advantages (consistency guarantees, efficiency,…), yet it also incurs a major disadvantage: over time, an increasing amount of data is stored and won’t ever be reclaimed, even if the data itself isn’t of any use any longer (e.g. a value node could be no longer referenced because the corresponding key was deleted or a new value was stored, internal nodes describing the tree itself become invalid because the tree structure changes when new key/value-pairs are added,…).

Even though storage capacity becomes cheaper, there’s no such thing as an infinite disk, and not freeing disk space, even though the data it contains has no actual value any longer, is useless.

One way to reduce the overhead is to rewrite the database, so it only contains the data contained in the tree at the moment the action is initiated. We call this a “full compaction”, rewriting the database to a second file. This has a major advantage: after compaction, the new database file is the smallest you can get (it doesn’t contain any overhead data), and as such data is stored very dense.

Their are some drawbacks as well: the operation can’t be performed online without taking specific measures, since the resulting database will only represent a snapshot of the original one. There might be a storage capacity issue as well: because the data is copied to another file, we need the disk space available to be able to perform these copies. In the worst case, if no data can be discarded at all, we’ll need 100% of the original database size available as free space, i.e. when compacting to the same disk, the size of the original database should be less than 50% of the total disk capacity. Do note this really is a worst-case scenario.

Next to this, when the resulting database of the compaction process should be stored on the hard disk which also contains the original database, performance will suffer: lots of random IO will be needed to read the original database, and because of this the IO operations to write the output file, which should be sequential, append-only during normal operation, will become random as well (because it’s interleaved with random reads).

Overall, even though the “full compaction” mechanism yields the best result in the end, it’s not something you want to run too frequently. On the other hand, we do want to return the overhead storage to the system so it can be reused for more useful purposes. This operation should be cheap, so it can be executed more frequently, and the number of IO operations required to perform the operation should be as low as possible.

 

First, a little intermezzo (you can skip this if you know what a ‘sparse’ file is). Most “modern” filesystems, especially (but not only) in the UNIX world, have support for ‘sparse’ files. A sparse file is a file which contains empty regions, blocks which appear to be filled with zeros, but aren’t really mapped to blocks on the block device to which the filesystem writes its data. These empty regions can be added to the file when it’s being written (you can extend a file with such empty space), and data can be written to these empty regions later on (at this point in time, the blocks are mapped to physical blocks, of course).

These ’empty’ regions are taken into account when determining the size of the file, but they don’t occupy any space on the disk device. When reading from these regions, all you get are ‘zero’ bytes.

Here’s a simple demonstration, creating a large (2GB) sparse file containing no data at all, then displaying its apparent size as well as the actual size on disk:

[nicolas@tau ~]$ dd if=/dev/null of=demo.dat seek=$((2 * 1024 * 1024)) bs=1k
0+0 records in
0+0 records out
0 bytes (0 B) copied, 2.6617e-05 s, 0.0 kB/s
[nicolas@tau ~]$ ls -lh demo.dat
-rw-rw-r--. 1 nicolas nicolas 2.0G Dec 16 16:10 demo.dat
[nicolas@tau ~]$ du --block-size=1 demo.dat
0 demo.dat

This shows the size of the generated file appears to be 2.0GB, but it’s stored to disk using exactly 0 bytes (next to the inode/metadata size, of course).

During normal operation, one can create a sparse region in a file by using the ftruncate(2) system call. This only allows to append an empty region to the end of a file, not marking an existing region as being ’empty’.

 

Luckily, (very) recent Linux kernel versions allow this in a couple of file systems: support for the FALLOC_FL_PUNCH_HOLE option for the fallocate(2) call is available in at lease ext4, XFS and OCFS2. Using this option, an existing region in a file can be marked as ’empty’, even though it contained data before. The file system will change its mappings so the region becomes sparse, and zero out any ‘overflow’ bytes at the left or right end side of the region (since only blocks can be unmapped).

Thanks to this feature, we can implement cheap resource deallocation by punching holes in a database file for every region which is no longer of any use. Blocks will become available again, and the file system will handle defragmentation (if required), whilst all offsets encoded in the internal nodes will remain valid.

 

Now we know the mechanism to free unused resources, we need to find out which resources can be freed, i.e. calculate where we can punch holes in the database file. Initially, we thought about walking the tree, keeping track of the offset and size of all referenced entries as a list of integer pairs, then merging adjecent pairs, and finally punching holes at all areas of the file which aren’t part of this final map.

This approach has a major drawback: a lot of state is being accumulated throughout the traversal, which results in lots of memory usage: the number of entries required to represent a large database can be huge!

Thanks to a very basic observation, a more simple, efficient and elegant algorithm is available though! Here’s the deal: entries are added to the database file using append-only writes. Any entry can contain pointers to other entries, encoded as the offsets of these entries in the file. An entry can, at all times, only point to other entries that were stored before itself. As such, an entry can only point to entries on its ‘left’, at lower offsets, an entry can never reference another entry on its ‘right’.

This simplifies things a lot, and allows us to implement the following recursive algorithm: the state passed at every function call consists of only 2 values: a set of offsets (integers) S, and a single offset (integer) O.

In every iteration, the largest integer in set S is determined, and removed from the set. Let’s call this value C. This is the offset of the entry we’ll handle during this iteration. We read the entry from disk, and parse it. If it’s an entry which references other entries (i.e. a Leaf or Index entry), we add the offsets of every entry it references to set S. Do note, at all times, these offsets will be smaller than C (entries can only refer to other entries found earlier in the file!).

Next, we calculate the size L of the entry we’re dealing with, and calculate the offset of the end of the entry in the file, E = C + L.

At this point, we can safely punch a hole from E to O (read on to know what O represents).

Now we check whether S is empty. If it is, there are no more referenced entries below offset C. As such, we can punch a hole from the beginning of the file (or, in the Baardskeerder on-disk format, after the metadata blocks) up to C.

Otherwise, if S isn’t empty, a recursive call is made, passing the updated set and using C as value for O.

Finally, we need to initiate the operation. Here’s how: first, find the commit entry of the oldest version of the database to be retained (remember, Baardskeerder is a functional/persistent data structure!). Any older versions (with commit entries at lower offsets) will become invalid because of the compaction process. Now, the recursive process can be initiated using the singleton set containing the offset of the root entry to which the commit entry points as S, and the offset of the commit entry as value of O.

Note set S will, unlike the list in the initial approach, never be very big: it will, at all times, only contain the offset of all known entries at the left of the one we’re dealing with, it’ll never contain the offsets of all entries in the database.

Here’s a Haskell implementation of the algorithm, which might clarify some things:


import Prelude hiding (null)
import Data.ByteString (ByteString)
import Data.Int (Int32)
import Data.IntSet (IntSet)
import qualified Data.IntSet as IS
import System.Posix (COff, Fd)

type Key = ByteString
type Value = ByteString

type Offset = COff
type Length = Int32

data Entry = Commit Offset Length Offset
           | Leaf Offset Length Value
           | Node Offset Length [(Key, Offset)]

getEntry :: Fd -> Offset -> IO Entry
getEntry f o = undefined

punch :: Fd -> Offset -> Length -> IO ()
punch f o l = undefined

data CompactState = CS Offset IntSet

fileStart :: Offset
fileStart = 4096

compact :: Fd -> Offset -> IO ()
compact f o = do
    c <- getEntry f o
    case c of
        Commit _ _ r -> compact' f $ CS o (IS.singleton $ fromIntegral r)
        otherwise -> error "compact: not a commit entry"

compact' :: Fd -> CompactState -> IO ()
compact' f (CS n os) = do
    -- At this stage, s should never be empty
    let (h, os') = IS.deleteFindMax os
        h' = fromIntegral h

    e <- getEntry f h'

    let (e', os'') = case e of
            Leaf o l _ -> (o + fromIntegral l, os')
            Node o l rs ->
                (o + fromIntegral l,
                    foldr (IS.insert . fromIntegral) os' $ map snd rs)
            otherwise -> error "compact': invalid entry type"

    punch f e' $ fromIntegral (n - e')

    if (IS.null os'')
    then punch f fileStart $ fromIntegral (h' - fileStart)
    else compact' f (CS h' os'')

To conclude, some related “real-world software engineering” notes:

  • Since only blocks can be unmapped, calls to “punch” should be block-aligned, otherwise overflow is filled with zeroes, which results in completely useless and unnecessary random write operations
  • One might want to use a threshold to punch only if the hole would be big enough (e.g. at least 10 consecutive blocks): creating lots of small holes can introduce unwanted fragmentation (this depends on the usage pattern of your database)
  • It’s possible to be “smart” and only punch holes at regions which are still backed by physical blocks (the algorithm doesn’t, and shouldn’t, take any existing holes into account!). There are 2 ways to request information about existing holes to the file system: using ioctl(2) with the FS_IOC_FIEMAP option, if the file system supports this, or using lseek(2) with SEEK_HOLE and SEEK_DATA (which also requires filesystem support to work correctly). No testing has been performed to check out whether this provides any benefits yet.

Baardskeerder’s transaction strategy

Baardskeerder is a simple embedded database based around an append-only B-treeish datastructure.
It’s a dictionary that also supports range queries and transactions.
Baardskeerder is implemented in ocaml, and the main idea is that it will replace Tokyo Cabinet in our Arakoon Key-Value Store.
This post will try to will explain our approach regarding transactions.

Preliminaries

Baardskeerder appends slabs to a log. Slabs are list of entries, and entries can be
either values, leaves, indexes, or commits. Suppose we start with an empty log, and add a first update (set “f” “F”).
Baardskeerder builds a slab that looks like this:

Value "F"
Leaf ["f", Inner 0]
Commit (Inner 1)

This slab gets serialized, and added to a log, which looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)

Notice there are Outer and Inner references. Outer ones refer to positions in the Log, while Inner ones refer to the current slab. During serialization, each inner positions needs to be translated into an outer one, while outer positions just can be copied. (In reality, Outer positions are a bit more complex to calculate as the size of entries after serialization influences them, but I’ve abstracted away from that problem here)

When a slab is ready to be serialized and added, it always ends with a Commit node. This commit node is necessary as there might be a calamity while writing out the serialized slab and it might not end up entirely in the log. So after a successful update, the last entry in the log is a Commit pointing to the current root node.
To fully appreciate the difference between slab and log references,
let’s add another update (set “d” “D”).
The new slab looks like this:

Value "D"
Leaf ["d", Inner 0; "f", Outer 0]
Commit (Inner 1)

The new leaf in the slab refers both to the value at position 0 in the log (Value “F”) and to the value at position 0 in the slab (Value “D”).
After serialization of the slab, and writing it to the log, the log looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)
Value "D"
Leaf ["d", Outer 3; "f", Outer 0]
Commit (Outer 4)

Let’s add some more updates to have a more interesting log:

 set "h" "H";
 set "a" "A";
 set "z" "Z"
 

After which, the log looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)
Value "D"
Leaf ["d", Outer 3; "f", Outer 0]
Commit (Outer 4)
Value "H"
Leaf ["d", Outer 3; "f", Outer 0; "h", Outer 6]
Commit (Outer 7)
Value "A"
Leaf ["a", Outer 9; "d", Outer 3]
Leaf ["f", Outer 0; "h", Outer 6]
Index Outer 10, ["d", Outer 11]
Commit (Outer 12)
Value "Z"
Leaf ["f", Outer 0; "h", Outer 6; "z", Outer 14]
Index Outer 10, ["d", Outer 15]
Commit (Outer 16)

This corresponds to the following tree:
tree after 5 sets

There are several little things to notice here. First, leaves that overflow (size = 4) are split into to new leaves with an Index entry to refer to them both. Also, the entries in the log only refer to previous entries, and, most importantly, nothing ever changes inside the log.

Transactions

Suppose by some accident the process dies while writing the slab, the last slab doesn’t quite fully make it to the log. In the above example, the log might end up just after the value “Z” (or a bit further with half a leaf written).
At the next opening of the log, we’ll search for the last commit, and the log gets truncated just after position 13. As such, the presence of Commit entries that make updates atomic.

They also provide a means to implement non-concurrent transactions. In fact, if we only have a commit entry at the end of a slab, the slab literally is the transaction. At the beginning of the transaction, we create a slab, and we pass it along for all updates. A commit entry is written at the end of the slab, when we’re done. The only added complexity comes from descending the tree:
We must start at the root in the slab (or log if it’s empty) and if we have to jump to an entry, we have to look at its position and jump into the slab if it’s an inner position, and into the log if it’s an outer one. (How to descend a tree is described on more detail a previous blog post)

Our transactional API is simple and clean:

 module DBX :
  sig
    type tx
    val get : tx -> k -> v
    val set : tx -> k -> v -> unit
    val delete : tx -> k -> unit

    ...

    val with_tx : log -> (tx -> unit) -> unit
  end

The following code fragment illustrates our transaction api:

let xs = ["f","F";
	  "d","D";
	  "h","H";
	  "a","A";
	  "z","Z";
	 ]
in
DBX.with_tx log
  (fun tx ->
    List.iter (fun (k,v) ->
      DBX.set tx k v) xs
  )

and suppose we apply this transaction to an empty log, we then end up with the following:

Value "F"
Leaf ["f", Outer 0]
Value "D"
Leaf ["d", Outer 2; "f", Outer 0]
Value "H"
Leaf ["d", Outer 2; "f", Outer 0; "h", Outer 4]
Value "A"
Leaf ["a", Outer 6; "d", Outer 2]
Leaf ["f", Outer 0; "h", Outer 4]
Index Outer 7, ["d", Outer 8]
Value "Z"
Leaf ["f", Outer 0; "h", Outer 4; "z", Outer 10]
Index Outer 7, ["d", Outer 11]
Commit (Outer 12)

Performance Summary: each transaction boils down to 1 slab, and each slab is written using 1 system call (write).

Slab Compaction

Batching updates in transactions uses less space compared to individual updates, but we still have a lot of dead entries. Take for example the log above. Entries 1,3, 5,8, and 9 are not useful in any way. So it would be desirable to prune them from the slab before they even hit the log.
This turns out to be surprisingly easy.

First we have to find the dead entries, which is done with a simple iteration through the slab, starting from the root, and marking every Inner position as known. When you’re done,
the positions that you didn’t mark are garbage.

type t = { mutable es: entry array; mutable nes: int}

let mark slab =
  let r = Array.make (slab.nes) false in
  let maybe_mark = function
    | Outer _ -> ()
    | Inner x -> r.(x) <- true
  in
  let maybe_mark2 (_,p) = maybe_mark p in
  let mark _ e =
    match e with
      | NIL | Value _ -> ()
      | Commit p -> maybe_mark p
      | Leaf l -> List.iter maybe_mark2 l
      | Index (p0,kps) -> let () = maybe_mark p0 in List.iter maybe_mark2 kps
  in
  let () = iteri_rev slab mark in
  let () = r.(slab.nes -1) <- true in
  r

After we have marked the dead entries, we iterate through the slab again, from position 0 towards the end, and build a mapping that tells you how to translate an old Inner position to a new Inner position.

let mapping mark =
  let s = Array.length mark in
  let h = Hashtbl.create s in
  let rec loop i o =
    if i = s then h
    else
      let v = mark.(i) in
      let i' = i + 1 in
      let () = Hashtbl.add h i o in
      let o' = if v then o + 1 else o in
      loop i' o'
  in
  loop 0 0

The last phase is the actual rewriting of the slab, without the dead entries. This again is a simple iteration.

let rewrite s s_mark s_map =
  let lookup_pos = function
    | Outer x -> Outer x
    | Inner x -> Inner (Hashtbl.find s_map x)
  in
  let rewrite_leaf kps       = List.map (fun (k,p) -> (k,lookup_pos p)) kps in
  let rewrite_index (p0,kps) = (lookup_pos p0 , rewrite_leaf kps) in
  let rewrite_commit p       = lookup_pos p in
  let esa = s.es in
  let size = s.nes in
  let r = Array.create size NIL in
  let rec loop c i =
    if i = size
    then { es = r; nes = c}
    else
      begin
	let i' = i + 1 in
	let a = s_mark.(i) in
	if a then
	  let e = esa.(i) in
	  let e' = match e with
	    | Leaf  l  -> Leaf (rewrite_leaf l)
	    | Index i  -> Index (rewrite_index i)
	    | Commit p -> Commit (rewrite_commit p)
	    | Value _
	    | NIL -> e
	  in
	  let () = r.(c) <- e' in
	  let c' = c + 1 in
	  loop c' i'
	else
	  loop c i'
      end
  in
  loop 0 0

Gluing it all together:

let compact s =
  let s_mark = mark s in
  let s_map = mapping s_mark in
  rewrite s s_mark s_map

If we take a look at the slab for our example transaction of 5 sets before compaction

Value "F"
Leaf ["f", Inner 0]
Value "D"
Leaf ["d", Inner 2; "f", Inner 0]
Value "H"
Leaf ["d", Inner 2; "f", Inner 0; "h", Inner 4]
Value "A"
Leaf ["a", Inner 6; "d", Inner 2]
Leaf ["f", Inner 0; "h", Inner 4]
Index Inner 7, ["d", Inner 8]
Value "Z"
Leaf ["f", Inner 0; "h", Inner 4; "z", Inner 10]
Index Inner 7, ["d", Inner 11]
Commit (Inner 12)

and after compaction:

Value "F"
Value "D"
Value "H"
Value "A"
Leaf ["a", Inner 3; "d", Inner 1]
Leaf ["f", Inner 0; "h", Inner 2]
Value "Z"
Leaf ["f", Inner 0; "h", Inner 2; "z", Inner 6]
Index Inner 4, ["d", Inner 7]
Commit (Inner 8)

We can see it’s worth the effort.
Of course, the bigger the transaction, the bigger the benefit of compaction.

Question to my 2.5 readers: If you’ve seen a paper, blog, article,… describing this, please be so kind to send me a link.

Offline log compaction

Given Baardskeerder has transactions with slab compaction, we can build a simple and efficient offline compaction algorithm for log files. Start from the root, walk over the tree while building big transactions (as big as you want, or as big as you can afford), and add these to a new log file. When you’re done, you have your compacted log.

Online log compaction

Baardskeerder logs can also return garbage to the filesystem while they are in use, but that will be kept for a later post.

have fun,

Romain.

Baardskeerder is Afrikaans for barber, but also refers to an order of creatures called Solifugae.
A more marketing-prone name would have been Mjolnir Mach3 Ultra Turbo Plus Enterprise Edition with Aloe vera.

What’s in a name?


Rethinking an embedded database API

One of the projects the Incubaid research team is working on is a new embedded database (where ’embedded’ refers to the way other projects like GNU DBMBerkeley DB, Tokyo Cabinet or SQLite are used). This project, codename Baardskeerder, is still in its infancy, and we’re toying with different approaches to various problems.

The database itself does not expose a full-fledged SQL interface like SQLite does: the interface provides the ability to store values for a given key, retrieve the value stored under a key, and some more complex operations like range lookups.

The API for a get operation seems rather obvious on first sight: you pass in a key, and the corresponding value is returned if the key has been set before, or the non-existence is signaled somehow (note: all pseudocode in this post is Haskell):

import qualified Data.ByteString.Char8 as BS
import Data.ByteString.Char8 (ByteString)

type Key = ByteString
type Value = ByteString

data DB = DB

get :: DB -> Key -> Maybe Value
get d k = undefined

Since the database is persisted (e.g. in a file on a file system on a disk), and will most likely be used by server-style applications, returning values to some client, whilst the server application doesn’t care about the values themselves, this style of API has a major drawback: it forces copying values from kernelspace into userspace, after which the value is written to some socket, copying it from userspace back into kernelspace, without any intermediate modifications, or even access.

In most modern kernels, including Linux, BSD, Solaris and others, there are several system calls available which allow one to send data from files on a socket, without this useless copying to and from userspace. These include sendfile(2) and splice(2). We want to be able to support these optimizations from within applications embedding the database library, whilst still providing the ‘basic’ API as well.

One approach would be to implement a custom getAndSendOnSocket call in the library, but this feels very much ad-hoc. Another way to tackle this is to return a file descriptor, an offset and a length on a get call, but this doesn’t feel right, since we don’t want to leak a file descriptor to the host application as-is.

What we got now is a callback/continuation-driven approach, where a host application can provide an action which will have access to the file descriptor, get an offset and length value, and can do whatever it feels like, whilst being wrapped inside an exception handler.

The signature of our get’ action becomes

get' :: DB -> Key -> (Result -> IO a) -> IO a

Here’s a pseudocode-implementation:

import Prelude hiding (concat, length, lookup)
import Data.Binary
import Data.ByteString.Char8
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int32)
import Control.Exception (bracket, finally)
import Foreign.Marshal.Alloc
import Foreign.Marshal.Array
import Network.Socket (Socket)
import Network.Socket.ByteString
import Network.Socket.SendFile.Handle
import System.Posix (COff, Fd, fdToHandle)
import System.Posix.IO.Extra (pread)

type Length = Int32

type Key = ByteString
type Value = ByteString

data DB = DB

data Result = NotFound
            | Value Value
            | Location Fd COff Length

-- Lookup a value in the database
lookup :: DB -> Key -> IO Result
lookup d k = undefined

-- Take a reference to the database
-- This is dummy, the actual API is more complex
ref :: DB -> IO ()
ref d = undefined

-- Return a reference to the database
unref :: DB -> IO ()
unref d = undefined

get' :: DB -> Key -> (Result -> IO a) -> IO a
get' d k h = do
    v <- lookup d k

    let (pre, post) = case v of
            Location _ _ _ -> (ref, unref)
            otherwise -> (\_ -> return (), \_ -> return ())

    pre d
    h v `finally` post d

Note a Result can be NotFound, an actual value (in our case this can happen if the value is compressed or encrypted on disk), or a pointer to the value in a given file.

Implementing the classic get becomes trivial:

get :: DB -> Key -> IO (Maybe Value)
get d k = get' d k h
  where
    h :: Result -> IO (Maybe Value)
    h NotFound = return Nothing
    h (Value v) = return $ Just v
    h (Location f o l) =
        Just `fmap` bracket
            (mallocArray l')
            free
            -- TODO This assumes pread(2) always returns the requested number
            -- of bytes
            (\s -> pread f s l' o >> packCStringLen (s, l'))
      where
        l' :: Int
        l' = fromIntegral l

The implementation of a get call which sends a value on a socket, prefixed with the length of the value, becomes simple as well:

sendOnSocket :: DB -> Key -> Socket -> IO ()
sendOnSocket d k s = get' d k h
  where
    h :: Result -> IO ()
    h NotFound = sendMany s zero
    h (Value v) = do
        sendMany s $ BL.toChunks $ encode $ (fromIntegral $ length v :: Length)
        sendAll s v
    h (Location f o l) = do
        sendMany s $ BL.toChunks $ encode l
        f' <- fdToHandle f
        sendFile' s f' (fromIntegral o) (fromIntegral l)

    zero = BL.toChunks $ encode (0 :: Int32)

Instead of sendFile’, some FFI binding to splice or something similar could be used as well.

In the current pseudo-implementation, a file descriptor can still be leaked (an application can call get’ with an action which stores the given Fd in some IORef), and given actions can alter the file pointer using lseek(2) and others. We could tackle this, if necessary, by duplicating the file descriptor using dup(2) and passing the copy to the application, then closing the duplicate after completion (so there’s no use for the application to store the descriptor: it’ll become invalid anyway).