User functions in Arakoon

Mahomet cald the Hill to come to him. And when the Hill stood still, he was neuer a whit abashed, but said;
If the Hill will not come to Mahomet, Mahomet wil go to the hill.

Francis Bacon

Introduction

Arakoon tries to be a simple distributed key value store that favours consistency over availability.
From time to time, we get feature requests for additional commands like:

  • assert_exists: assert a value exists for the key without caring what the value actually is
  • increment_counter: increment (or create) a counter and return the new value.
  • queue operations : add an element to the front/back of a double ended queue or pop an element
  • set_and_update_X: insert a key value pair and update some non-trivial counter X (think averages, variances,…)

The list is semi-infinite and the common thing here is that they are too complex/specific/weird/… to do them in one step using the provided interface. Of course, you can do all of this on the client side, but it will cost extra network round-trips. In distributed systems, you really want to keep the number of round-trips low, which pushes you towards these kind of feature requests.

Once you decided (performance bottlenecks probably) that you need extra functionality there are two things you can do. First, you can try to force or entice us into adding them to the core interface or alternatively, you can get by using Arakoon’s “user functions”. For some reason people fear them but there’s no real technical reason to do so.

This blog post will cover two things. First we’ll go in to the nitty gritty of coding and deploying user functions and then we’ll look at some of the strategic/architectural challenges of user functions.

How do user functions work?

The high level view is this: you build a user function, and register it to an Arakoon cluster before you start it. Then, at runtime, you can call it, using any client, with a parameter (a string option) and get back a result (string option). On the server side, the master will log this in its transaction log, try to reach consensus with the slave(s) and once that is the case, the user function will be executed inside a transaction. The result of that call will be sent to the client. If an exception occurs, the transaction will be aborted. Since Arakoon logs transactions it can replay them in case of calamities. This has a very important impact: since Arakoon needs to be able to replay the execution of a user function, you cannot do side effects, use random values or read the system clock.

Running Example

We’re going to try to build a simple queue API.
It will offer named queues with 2 operations: push and pop. Also, it’s a first-in-first-out thingy.

Arakoon 1

Client side API

Arakoon 1 offers the following API for user functions.

def userFunction(self, name, argument):
'''Call a user-defined function on the server
@param name: Name of user function
@type name: string
@param argument: Optional function argument
@type argument: string option

@return: Function result
@rtype: string option
'''

Let’s take a look at it. A userFunction call needs the name, which is a string, and an argument which is a string option and returns a result of type string option. So what exactly is a string option in Python? Well, it’s either a string or None. This allows a user function to not take input or to not yield a result.

Server side API

The server side API is in OCaml, and looks like this:


class type user_db =
object
method set : string -> string -> unit
method get : string -> string
method delete: string -> unit
method test_and_set: string -> string option -> string option -> string option
method range_entries: string option -> bool -> string option -> bool -> int
-> (string * string) list
end

User functions on server side match the client’s opaque signature.

user_db -> string option -> string option

Queue’s client side

Let’s create the client side in python. We’ll create a class that uses an Arakoon client and acts as a queue. The problem with push is that we need to fit both the name and the value into the one paramater we have available. We need to do our own serialization. Let’s just be lazy (smart?) and use Arakoon’s serialization. The code is shown below.

from arakoon import Arakoon
from arakoon import ArakoonProtocol as P

class ArakoonQueue:
    def __init__(self, name, client):
        self._name = name
        self._client = client

    def push(self, value):        
        input =   P._packString(self._name) 
        input +=  P._packString(value)
        self._client.userFunction("QDemo.push", input)

    def pop(self):
        value = self._client.userFunction("QDemo.pop", self._name)
        return value



That wasn’t too hard now was it?

Queue, server side

The whole idea is that the operations happen on server side, so this will be a tat more complex.
We need to model a queue using a key value store. Code-wise, that’s not too difficult.
For each queue, we’ll keep 2 counters that keep track of both ends of the queue.

A push is merely getting the qname and the value out of the input, calculating the place where we need to store it, store the value there and update the counter for the back end of the queue. A pop is similar but when the queue becomes empty, we use the opportunity to reset the counters (maybe_reset_counters). The counter representation is a bit weird but Arakoon stores things in lexicographical order and we want to take advantage of this to keep our queue fifo. Hence, we need to make the counter in such a way the counter’s order is the same as a string’s order. The code is shown below.

(* file: plugin_qdemo.ml *)

open Registry 

let zero = ""
let begin_name qname = qname ^ "/@begin" 
let end_name qname = qname ^ "/@end"
let qprefix qname key = qname ^ "/" ^ key

let next_counter = function
  | "" -> "A"
  | s -> 
      begin
        let length = String.length s in
        let last = length - 1 in
        let c = s.[last] in
        if c = 'H' 
        then s ^ "A"
        else let () = s.[last] <- Char.chr(Char.code c + 1) in 
             s
      end

let log x= 
  let k s = let s' = "[plugin_qdemo]:" ^ s in
            Lwt.ignore_result (Lwt_log.debug s')
  in
  Printf.ksprintf k x

let maybe_reset_counters user_db qname b1 = 
  let e_key = end_name qname in
  let exists = 
    try let _ = user_db # get e_key in true with Not_found -> false 
  in
  if exists
  then 
    let ev = user_db # get e_key in
    if ev = b1 then
      let b_key = begin_name qname in
      let () = user_db # set b_key zero in
      let () = user_db # set e_key zero in
      ()
    else
      ()
  else ()

let push user_db vo = 
  match vo with
    | None -> invalid_arg "push None"
    | Some v -> 
        let qname, p1 = Llio.string_from v 0 in
        let value, _ = Llio.string_from v p1 in
        let e_key = end_name qname in
        let b0 = 
          try user_db # get (end_name qname) 
          with Not_found -> zero 
        in
        let b1 = next_counter b0 in
        let () = user_db # set (qprefix qname b1) value in
        let () = user_db # set e_key b1 in
        None

let pop user_db vo =
  match vo with 
    | None   -> invalid_arg "pop None"
    | Some qname -> 
        let b_key = begin_name qname in
        let b0 = 
          try user_db # get (begin_name qname) 
          with Not_found -> zero
        in
        let b1 = next_counter b0 in
        try 
          let k = qprefix qname b1 in
          let v = user_db # get k in 
          let () = user_db # set b_key b1 in
          let () = user_db # delete k in
          let () = maybe_reset_counters user_db qname b1 in
          Some v
        with
          Not_found ->
            let e_key = end_name qname in
            let () = user_db # set b_key zero in
            let () = user_db # set e_key zero in
            None
              

let () = Registry.register "QDemo.push" push
let () = Registry.register "QDemo.pop" pop

The last two lines register the functions to the Arakoon cluster when the module is loaded.

Compilation

So how do you deploy your user function module into an Arakoon cluster?
First need to compile your module into something that can be dynamically loaded.
To compile the plugin_qdemo.ml I persuade ocamlbuild like this:

ocamlbuild -use-ocamlfind -tag 'package(arakoon_client)' \
-cflag -thread -lflag -thread \
plugin_qdemo.cmxs

It’s not too difficult to write your own testcase for your functionality, so you can run it outside of Arakoon and concentrate on getting the code right.

Deployment

First, you need put your compilation unit into the Arakoon home directory on all your nodes of the cluster. And second, you need to add the name to the global section of your cluster configuration. Below, I show the configuration file for my simple, single node cluster called ricky.


[global]
cluster = arakoon_0
cluster_id = ricky

### THIS REGISTERS THE USER FUNCTION:
plugins = plugin_qdemo

[arakoon_0]
ip = 127.0.0.1
client_port = 4000
messaging_port = 4010
home = /tmp/arakoon/arakoon_0

All right, that’s it. Just a big warning about user functions here.

Once a user function is installed, it needs to remain available, with the same functionality for as long as user function calls are stored inside the transaction logs, as they need to be re-evaluated when one replays a transaction log to a store (for example when a node crashed, leaving a corrupt database behind). It’s not a bad idea to include a version in the name of a user function to cater for evolution.

Demo

Let’s use it in a simple python script.

def make_client():
    clusterId = 'ricky'
    config = Arakoon.ArakoonClientConfig(clusterId,
                                         {"arakoon_0":("127.0.0.1", 4000)})
    client = Arakoon.ArakoonClient(config)
    return client

if __name__ == '__main__':
    client = make_client()
    q = ArakoonQueue("qdemo", client)
    q.push("bla bla bla")
    q.push("some more bla")
    q.push("3")
    q.push("4")
    q.push("5")
    print q.pop()
    print q.pop()
    print q.pop()
    print q.pop()

with expected results.

Arakoon 2

With Arakoon 2 we moved to Baardskeerder as a database backend, replacing the combination of transaction logs and Tokyo Cabinet. Since the backend is Lwt-aware, this means that the server side API has become too:

module UserDB :
sig
type tx = Core.BS.tx
type k = string
type v = string
val set    : tx -> k -> v -> unit Lwt.t
val get    : tx -> k -> (v, k) Baardskeerder.result Lwt.t
val delete : tx -> k -> (unit, Baardskeerder.k) Baardskeerder.result Lwt.t
end

module Registry:
sig
type f = UserDB.tx -> string option -> (string option) Lwt.t
val register: string -> f -> unit
val lookup: string -> f
end

The major changes are that

  • the api now uses Lwt
  • we have (‘a,’b) Baardskeerder.result types, which we favour over the use of exceptions for normal cases.

Rewriting the queue implementation to Arakoon 2 yields something like:

(* file: plugin_qdemo2.ml *)

open Userdb
open Lwt
open Baardskeerder

let zero = ""
let begin_name qname = qname ^ "/@begin" 
let end_name qname = qname ^ "/@end"
let qprefix qname key = qname ^ "/" ^ key

let next_counter = function
  | "" -> "A"
  | s -> 
      begin
        let length = String.length s in
        let last = length - 1 in
        let c = s.[last] in
        if c = 'H' 
        then s ^ "A"
        else let () = s.[last] <- Char.chr(Char.code c + 1) in 
             s
      end

let reset_counters tx qname =
  let b_key = begin_name qname in
  let e_key = end_name qname in
  UserDB.set tx b_key zero >>= fun () ->
  UserDB.set tx e_key zero


let maybe_reset_counters tx qname (b1:string) = 
  let e_key = end_name qname in
  begin
    UserDB.get tx e_key >>= function
      | OK  _ -> Lwt.return true
      | NOK _ -> Lwt.return false
  end >>= function
    | true ->
        begin
          UserDB.get tx e_key >>= function
            | OK ev ->
                if ev = b1 
                then reset_counters tx qname
                else Lwt.return ()
            | NOK _  -> Lwt.return ()
        end
    | false  -> Lwt.return ()

let push tx vo = 
  match vo with
    | None -> Lwt.fail (invalid_arg "push None")
    | Some v -> 
        let qname, p1 = Llio.string_from v 0 in
        let value, _ = Llio.string_from v p1 in
        Lwt_log.debug_f "push:qname=%S;value=%S" qname value >>= fun ()->
        let e_key = end_name qname in
        UserDB.get tx (end_name qname) >>= fun b0r ->
        let b0 = match b0r with
          | OK b0 -> b0
          | _     -> zero
        in
        let b1 = next_counter b0 in
        UserDB.set tx (qprefix qname b1) value >>= fun () ->
        UserDB.set tx e_key b1 >>= fun () ->
        Lwt.return None



let pop tx = function
  | None   -> Lwt.fail (invalid_arg "pop None")
  | Some qname -> 
      begin
        let b_key = begin_name qname in
        UserDB.get tx (begin_name qname) >>= fun b0r ->
        begin
          match b0r with
            | OK b0 -> Lwt.return b0
            | NOK _ -> Lwt.return zero
        end
        >>= fun b0 ->
        let b1 = next_counter b0 in
        let k = qprefix qname b1 in
        UserDB.get tx k >>= fun vr ->
        begin
          match vr with
            | OK value -> 
                begin
                  UserDB.set tx b_key b1 >>= fun () ->
                  UserDB.delete tx k >>= function 
                    | OK () ->
                        begin
                          maybe_reset_counters tx qname b1 >>= fun () ->
                          Lwt.return (Some value)
                        end
                    | NOK e -> Lwt.fail (Failure e)
                end
             | NOK _  -> 
                 reset_counters tx qname >>= fun () ->
                 Lwt.return None
        end
      end
        
let () = Userdb.Registry.register "QDemo.push" push
let () = Userdb.Registry.register "QDemo.pop" pop

Both client side and deployment remain the same.

Questions asked

Ain’t there something wrong with this Queue?

Yes! Glad you noticed. This queue concept is fundamentally broken. The problem is the pop.
Follow this scenario:

  1. the client calls the QDemo.pop function
  2. the cluster pops the value from the queue and its master sends it to the client.
  3. the client dies before it can read the popped value

Now what? We’ve lost that value. Bloody network, how dare you!

Ok, I admit this was naughty, but it’s a good example of a simple local concept that doesn’t really amount to the same thing when tried in a distributed context. When confronted with this hole, people immediately try to fix this with “Right!, so we need an extra call to …”. To which I note: “But wasn’t this extra call just the thing you were trying to avoid in the first place?”

Why don’t you allow user functions to be written in <INSERT YOUR FAVOURITE LANGUAGE HERE>?

This is a good question, and there are several answers, most of them wrong. For example, anything along the lines of “I don’t like your stinkin’ language” needs to be rejected because a language’s cuteness is irrelevant.

There are several difficulties with the idea of offering user functions to be written in another programming language. For scripting languages like Python, Lua, PHP ,… we can either implement our own interpreter and offer a subset of the language, which is a lot of work with low return on investment, or integrate an existing interpreter/runtime which will probably not play nice with Lwt, or with the OCaml runtime (garbage collector). For compiled languages we might go via the ffi but it’s still way more complex for us. So for now you’re stuck with OCaml for user functions. There are worse languages.

Wouldn’t it be better if you apply the result of the user function to the transaction log iso the arguments?

Well, we’ve been thinking about that a lot before we started with user functions. The alternative is that we record and log the effect of the user function so that we can always replay that effect later, even when the code is no longer available. It’s an intriguing alternative, but it’s not a clear improvement. It all depends on the size of the arguments versus the size of the effect.
Some user functions have a small argument set and a big effect, while for other user functions it’s the other way around.

Closing words

Technically, it’s not too difficult to hook in your own functionality into Arakoon. Just make sure the thing you want to hook in does not have major flaws.

have fun,

Romain.

Advertisements

Announcing Baardskeerder

We’re happy to announce the public availability of Baardskeerder, our implementation of a Copy-On-Write (append-only) B-tree-ish embedded database, after approval by our CEO. The source is available on GitHub as part of the Incubaid organization at https://github.com/Incubaid/baardskeerder, under the LGPL-3 license.

This is a technology preview. Baardskeerder is still under heavy development, and breaking changes will occur before a first stable release is made.

After experiencing performance issues when storing “large” (> 8kb) values in TokyoCabinet, the storage back-end used by our distributed key-value store Arakoon, the Baardskeerder project was initiated. Work has been started to integrate Baardskeerder in Arakoon as well.

If you’re interested in the name of the project, take a look at Wikipedia.


Baardskeerder’s transaction strategy

Baardskeerder is a simple embedded database based around an append-only B-treeish datastructure.
It’s a dictionary that also supports range queries and transactions.
Baardskeerder is implemented in ocaml, and the main idea is that it will replace Tokyo Cabinet in our Arakoon Key-Value Store.
This post will try to will explain our approach regarding transactions.

Preliminaries

Baardskeerder appends slabs to a log. Slabs are list of entries, and entries can be
either values, leaves, indexes, or commits. Suppose we start with an empty log, and add a first update (set “f” “F”).
Baardskeerder builds a slab that looks like this:

Value "F"
Leaf ["f", Inner 0]
Commit (Inner 1)

This slab gets serialized, and added to a log, which looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)

Notice there are Outer and Inner references. Outer ones refer to positions in the Log, while Inner ones refer to the current slab. During serialization, each inner positions needs to be translated into an outer one, while outer positions just can be copied. (In reality, Outer positions are a bit more complex to calculate as the size of entries after serialization influences them, but I’ve abstracted away from that problem here)

When a slab is ready to be serialized and added, it always ends with a Commit node. This commit node is necessary as there might be a calamity while writing out the serialized slab and it might not end up entirely in the log. So after a successful update, the last entry in the log is a Commit pointing to the current root node.
To fully appreciate the difference between slab and log references,
let’s add another update (set “d” “D”).
The new slab looks like this:

Value "D"
Leaf ["d", Inner 0; "f", Outer 0]
Commit (Inner 1)

The new leaf in the slab refers both to the value at position 0 in the log (Value “F”) and to the value at position 0 in the slab (Value “D”).
After serialization of the slab, and writing it to the log, the log looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)
Value "D"
Leaf ["d", Outer 3; "f", Outer 0]
Commit (Outer 4)

Let’s add some more updates to have a more interesting log:

 set "h" "H";
 set "a" "A";
 set "z" "Z"
 

After which, the log looks like this:

Value "F"
Leaf ["f", Outer 0]
Commit (Outer 1)
Value "D"
Leaf ["d", Outer 3; "f", Outer 0]
Commit (Outer 4)
Value "H"
Leaf ["d", Outer 3; "f", Outer 0; "h", Outer 6]
Commit (Outer 7)
Value "A"
Leaf ["a", Outer 9; "d", Outer 3]
Leaf ["f", Outer 0; "h", Outer 6]
Index Outer 10, ["d", Outer 11]
Commit (Outer 12)
Value "Z"
Leaf ["f", Outer 0; "h", Outer 6; "z", Outer 14]
Index Outer 10, ["d", Outer 15]
Commit (Outer 16)

This corresponds to the following tree:
tree after 5 sets

There are several little things to notice here. First, leaves that overflow (size = 4) are split into to new leaves with an Index entry to refer to them both. Also, the entries in the log only refer to previous entries, and, most importantly, nothing ever changes inside the log.

Transactions

Suppose by some accident the process dies while writing the slab, the last slab doesn’t quite fully make it to the log. In the above example, the log might end up just after the value “Z” (or a bit further with half a leaf written).
At the next opening of the log, we’ll search for the last commit, and the log gets truncated just after position 13. As such, the presence of Commit entries that make updates atomic.

They also provide a means to implement non-concurrent transactions. In fact, if we only have a commit entry at the end of a slab, the slab literally is the transaction. At the beginning of the transaction, we create a slab, and we pass it along for all updates. A commit entry is written at the end of the slab, when we’re done. The only added complexity comes from descending the tree:
We must start at the root in the slab (or log if it’s empty) and if we have to jump to an entry, we have to look at its position and jump into the slab if it’s an inner position, and into the log if it’s an outer one. (How to descend a tree is described on more detail a previous blog post)

Our transactional API is simple and clean:

 module DBX :
  sig
    type tx
    val get : tx -> k -> v
    val set : tx -> k -> v -> unit
    val delete : tx -> k -> unit

    ...

    val with_tx : log -> (tx -> unit) -> unit
  end

The following code fragment illustrates our transaction api:

let xs = ["f","F";
	  "d","D";
	  "h","H";
	  "a","A";
	  "z","Z";
	 ]
in
DBX.with_tx log
  (fun tx ->
    List.iter (fun (k,v) ->
      DBX.set tx k v) xs
  )

and suppose we apply this transaction to an empty log, we then end up with the following:

Value "F"
Leaf ["f", Outer 0]
Value "D"
Leaf ["d", Outer 2; "f", Outer 0]
Value "H"
Leaf ["d", Outer 2; "f", Outer 0; "h", Outer 4]
Value "A"
Leaf ["a", Outer 6; "d", Outer 2]
Leaf ["f", Outer 0; "h", Outer 4]
Index Outer 7, ["d", Outer 8]
Value "Z"
Leaf ["f", Outer 0; "h", Outer 4; "z", Outer 10]
Index Outer 7, ["d", Outer 11]
Commit (Outer 12)

Performance Summary: each transaction boils down to 1 slab, and each slab is written using 1 system call (write).

Slab Compaction

Batching updates in transactions uses less space compared to individual updates, but we still have a lot of dead entries. Take for example the log above. Entries 1,3, 5,8, and 9 are not useful in any way. So it would be desirable to prune them from the slab before they even hit the log.
This turns out to be surprisingly easy.

First we have to find the dead entries, which is done with a simple iteration through the slab, starting from the root, and marking every Inner position as known. When you’re done,
the positions that you didn’t mark are garbage.

type t = { mutable es: entry array; mutable nes: int}

let mark slab =
  let r = Array.make (slab.nes) false in
  let maybe_mark = function
    | Outer _ -> ()
    | Inner x -> r.(x) <- true
  in
  let maybe_mark2 (_,p) = maybe_mark p in
  let mark _ e =
    match e with
      | NIL | Value _ -> ()
      | Commit p -> maybe_mark p
      | Leaf l -> List.iter maybe_mark2 l
      | Index (p0,kps) -> let () = maybe_mark p0 in List.iter maybe_mark2 kps
  in
  let () = iteri_rev slab mark in
  let () = r.(slab.nes -1) <- true in
  r

After we have marked the dead entries, we iterate through the slab again, from position 0 towards the end, and build a mapping that tells you how to translate an old Inner position to a new Inner position.

let mapping mark =
  let s = Array.length mark in
  let h = Hashtbl.create s in
  let rec loop i o =
    if i = s then h
    else
      let v = mark.(i) in
      let i' = i + 1 in
      let () = Hashtbl.add h i o in
      let o' = if v then o + 1 else o in
      loop i' o'
  in
  loop 0 0

The last phase is the actual rewriting of the slab, without the dead entries. This again is a simple iteration.

let rewrite s s_mark s_map =
  let lookup_pos = function
    | Outer x -> Outer x
    | Inner x -> Inner (Hashtbl.find s_map x)
  in
  let rewrite_leaf kps       = List.map (fun (k,p) -> (k,lookup_pos p)) kps in
  let rewrite_index (p0,kps) = (lookup_pos p0 , rewrite_leaf kps) in
  let rewrite_commit p       = lookup_pos p in
  let esa = s.es in
  let size = s.nes in
  let r = Array.create size NIL in
  let rec loop c i =
    if i = size
    then { es = r; nes = c}
    else
      begin
	let i' = i + 1 in
	let a = s_mark.(i) in
	if a then
	  let e = esa.(i) in
	  let e' = match e with
	    | Leaf  l  -> Leaf (rewrite_leaf l)
	    | Index i  -> Index (rewrite_index i)
	    | Commit p -> Commit (rewrite_commit p)
	    | Value _
	    | NIL -> e
	  in
	  let () = r.(c) <- e' in
	  let c' = c + 1 in
	  loop c' i'
	else
	  loop c i'
      end
  in
  loop 0 0

Gluing it all together:

let compact s =
  let s_mark = mark s in
  let s_map = mapping s_mark in
  rewrite s s_mark s_map

If we take a look at the slab for our example transaction of 5 sets before compaction

Value "F"
Leaf ["f", Inner 0]
Value "D"
Leaf ["d", Inner 2; "f", Inner 0]
Value "H"
Leaf ["d", Inner 2; "f", Inner 0; "h", Inner 4]
Value "A"
Leaf ["a", Inner 6; "d", Inner 2]
Leaf ["f", Inner 0; "h", Inner 4]
Index Inner 7, ["d", Inner 8]
Value "Z"
Leaf ["f", Inner 0; "h", Inner 4; "z", Inner 10]
Index Inner 7, ["d", Inner 11]
Commit (Inner 12)

and after compaction:

Value "F"
Value "D"
Value "H"
Value "A"
Leaf ["a", Inner 3; "d", Inner 1]
Leaf ["f", Inner 0; "h", Inner 2]
Value "Z"
Leaf ["f", Inner 0; "h", Inner 2; "z", Inner 6]
Index Inner 4, ["d", Inner 7]
Commit (Inner 8)

We can see it’s worth the effort.
Of course, the bigger the transaction, the bigger the benefit of compaction.

Question to my 2.5 readers: If you’ve seen a paper, blog, article,… describing this, please be so kind to send me a link.

Offline log compaction

Given Baardskeerder has transactions with slab compaction, we can build a simple and efficient offline compaction algorithm for log files. Start from the root, walk over the tree while building big transactions (as big as you want, or as big as you can afford), and add these to a new log file. When you’re done, you have your compacted log.

Online log compaction

Baardskeerder logs can also return garbage to the filesystem while they are in use, but that will be kept for a later post.

have fun,

Romain.

Baardskeerder is Afrikaans for barber, but also refers to an order of creatures called Solifugae.
A more marketing-prone name would have been Mjolnir Mach3 Ultra Turbo Plus Enterprise Edition with Aloe vera.

What’s in a name?


Rethinking an embedded database API

One of the projects the Incubaid research team is working on is a new embedded database (where ’embedded’ refers to the way other projects like GNU DBMBerkeley DB, Tokyo Cabinet or SQLite are used). This project, codename Baardskeerder, is still in its infancy, and we’re toying with different approaches to various problems.

The database itself does not expose a full-fledged SQL interface like SQLite does: the interface provides the ability to store values for a given key, retrieve the value stored under a key, and some more complex operations like range lookups.

The API for a get operation seems rather obvious on first sight: you pass in a key, and the corresponding value is returned if the key has been set before, or the non-existence is signaled somehow (note: all pseudocode in this post is Haskell):

import qualified Data.ByteString.Char8 as BS
import Data.ByteString.Char8 (ByteString)

type Key = ByteString
type Value = ByteString

data DB = DB

get :: DB -> Key -> Maybe Value
get d k = undefined

Since the database is persisted (e.g. in a file on a file system on a disk), and will most likely be used by server-style applications, returning values to some client, whilst the server application doesn’t care about the values themselves, this style of API has a major drawback: it forces copying values from kernelspace into userspace, after which the value is written to some socket, copying it from userspace back into kernelspace, without any intermediate modifications, or even access.

In most modern kernels, including Linux, BSD, Solaris and others, there are several system calls available which allow one to send data from files on a socket, without this useless copying to and from userspace. These include sendfile(2) and splice(2). We want to be able to support these optimizations from within applications embedding the database library, whilst still providing the ‘basic’ API as well.

One approach would be to implement a custom getAndSendOnSocket call in the library, but this feels very much ad-hoc. Another way to tackle this is to return a file descriptor, an offset and a length on a get call, but this doesn’t feel right, since we don’t want to leak a file descriptor to the host application as-is.

What we got now is a callback/continuation-driven approach, where a host application can provide an action which will have access to the file descriptor, get an offset and length value, and can do whatever it feels like, whilst being wrapped inside an exception handler.

The signature of our get’ action becomes

get' :: DB -> Key -> (Result -> IO a) -> IO a

Here’s a pseudocode-implementation:

import Prelude hiding (concat, length, lookup)
import Data.Binary
import Data.ByteString.Char8
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int32)
import Control.Exception (bracket, finally)
import Foreign.Marshal.Alloc
import Foreign.Marshal.Array
import Network.Socket (Socket)
import Network.Socket.ByteString
import Network.Socket.SendFile.Handle
import System.Posix (COff, Fd, fdToHandle)
import System.Posix.IO.Extra (pread)

type Length = Int32

type Key = ByteString
type Value = ByteString

data DB = DB

data Result = NotFound
            | Value Value
            | Location Fd COff Length

-- Lookup a value in the database
lookup :: DB -> Key -> IO Result
lookup d k = undefined

-- Take a reference to the database
-- This is dummy, the actual API is more complex
ref :: DB -> IO ()
ref d = undefined

-- Return a reference to the database
unref :: DB -> IO ()
unref d = undefined

get' :: DB -> Key -> (Result -> IO a) -> IO a
get' d k h = do
    v <- lookup d k

    let (pre, post) = case v of
            Location _ _ _ -> (ref, unref)
            otherwise -> (\_ -> return (), \_ -> return ())

    pre d
    h v `finally` post d

Note a Result can be NotFound, an actual value (in our case this can happen if the value is compressed or encrypted on disk), or a pointer to the value in a given file.

Implementing the classic get becomes trivial:

get :: DB -> Key -> IO (Maybe Value)
get d k = get' d k h
  where
    h :: Result -> IO (Maybe Value)
    h NotFound = return Nothing
    h (Value v) = return $ Just v
    h (Location f o l) =
        Just `fmap` bracket
            (mallocArray l')
            free
            -- TODO This assumes pread(2) always returns the requested number
            -- of bytes
            (\s -> pread f s l' o >> packCStringLen (s, l'))
      where
        l' :: Int
        l' = fromIntegral l

The implementation of a get call which sends a value on a socket, prefixed with the length of the value, becomes simple as well:

sendOnSocket :: DB -> Key -> Socket -> IO ()
sendOnSocket d k s = get' d k h
  where
    h :: Result -> IO ()
    h NotFound = sendMany s zero
    h (Value v) = do
        sendMany s $ BL.toChunks $ encode $ (fromIntegral $ length v :: Length)
        sendAll s v
    h (Location f o l) = do
        sendMany s $ BL.toChunks $ encode l
        f' <- fdToHandle f
        sendFile' s f' (fromIntegral o) (fromIntegral l)

    zero = BL.toChunks $ encode (0 :: Int32)

Instead of sendFile’, some FFI binding to splice or something similar could be used as well.

In the current pseudo-implementation, a file descriptor can still be leaked (an application can call get’ with an action which stores the given Fd in some IORef), and given actions can alter the file pointer using lseek(2) and others. We could tackle this, if necessary, by duplicating the file descriptor using dup(2) and passing the copy to the application, then closing the duplicate after completion (so there’s no use for the application to store the descriptor: it’ll become invalid anyway).