User functions in Arakoon

Mahomet cald the Hill to come to him. And when the Hill stood still, he was neuer a whit abashed, but said;
If the Hill will not come to Mahomet, Mahomet wil go to the hill.

Francis Bacon

Introduction

Arakoon tries to be a simple distributed key value store that favours consistency over availability.
From time to time, we get feature requests for additional commands like:

  • assert_exists: assert a value exists for the key without caring what the value actually is
  • increment_counter: increment (or create) a counter and return the new value.
  • queue operations : add an element to the front/back of a double ended queue or pop an element
  • set_and_update_X: insert a key value pair and update some non-trivial counter X (think averages, variances,…)

The list is semi-infinite and the common thing here is that they are too complex/specific/weird/… to do them in one step using the provided interface. Of course, you can do all of this on the client side, but it will cost extra network round-trips. In distributed systems, you really want to keep the number of round-trips low, which pushes you towards these kind of feature requests.

Once you decided (performance bottlenecks probably) that you need extra functionality there are two things you can do. First, you can try to force or entice us into adding them to the core interface or alternatively, you can get by using Arakoon’s “user functions”. For some reason people fear them but there’s no real technical reason to do so.

This blog post will cover two things. First we’ll go in to the nitty gritty of coding and deploying user functions and then we’ll look at some of the strategic/architectural challenges of user functions.

How do user functions work?

The high level view is this: you build a user function, and register it to an Arakoon cluster before you start it. Then, at runtime, you can call it, using any client, with a parameter (a string option) and get back a result (string option). On the server side, the master will log this in its transaction log, try to reach consensus with the slave(s) and once that is the case, the user function will be executed inside a transaction. The result of that call will be sent to the client. If an exception occurs, the transaction will be aborted. Since Arakoon logs transactions it can replay them in case of calamities. This has a very important impact: since Arakoon needs to be able to replay the execution of a user function, you cannot do side effects, use random values or read the system clock.

Running Example

We’re going to try to build a simple queue API.
It will offer named queues with 2 operations: push and pop. Also, it’s a first-in-first-out thingy.

Arakoon 1

Client side API

Arakoon 1 offers the following API for user functions.

def userFunction(self, name, argument):
'''Call a user-defined function on the server
@param name: Name of user function
@type name: string
@param argument: Optional function argument
@type argument: string option

@return: Function result
@rtype: string option
'''

Let’s take a look at it. A userFunction call needs the name, which is a string, and an argument which is a string option and returns a result of type string option. So what exactly is a string option in Python? Well, it’s either a string or None. This allows a user function to not take input or to not yield a result.

Server side API

The server side API is in OCaml, and looks like this:


class type user_db =
object
method set : string -> string -> unit
method get : string -> string
method delete: string -> unit
method test_and_set: string -> string option -> string option -> string option
method range_entries: string option -> bool -> string option -> bool -> int
-> (string * string) list
end

User functions on server side match the client’s opaque signature.

user_db -> string option -> string option

Queue’s client side

Let’s create the client side in python. We’ll create a class that uses an Arakoon client and acts as a queue. The problem with push is that we need to fit both the name and the value into the one paramater we have available. We need to do our own serialization. Let’s just be lazy (smart?) and use Arakoon’s serialization. The code is shown below.

from arakoon import Arakoon
from arakoon import ArakoonProtocol as P

class ArakoonQueue:
    def __init__(self, name, client):
        self._name = name
        self._client = client

    def push(self, value):        
        input =   P._packString(self._name) 
        input +=  P._packString(value)
        self._client.userFunction("QDemo.push", input)

    def pop(self):
        value = self._client.userFunction("QDemo.pop", self._name)
        return value



That wasn’t too hard now was it?

Queue, server side

The whole idea is that the operations happen on server side, so this will be a tat more complex.
We need to model a queue using a key value store. Code-wise, that’s not too difficult.
For each queue, we’ll keep 2 counters that keep track of both ends of the queue.

A push is merely getting the qname and the value out of the input, calculating the place where we need to store it, store the value there and update the counter for the back end of the queue. A pop is similar but when the queue becomes empty, we use the opportunity to reset the counters (maybe_reset_counters). The counter representation is a bit weird but Arakoon stores things in lexicographical order and we want to take advantage of this to keep our queue fifo. Hence, we need to make the counter in such a way the counter’s order is the same as a string’s order. The code is shown below.

(* file: plugin_qdemo.ml *)

open Registry 

let zero = ""
let begin_name qname = qname ^ "/@begin" 
let end_name qname = qname ^ "/@end"
let qprefix qname key = qname ^ "/" ^ key

let next_counter = function
  | "" -> "A"
  | s -> 
      begin
        let length = String.length s in
        let last = length - 1 in
        let c = s.[last] in
        if c = 'H' 
        then s ^ "A"
        else let () = s.[last] <- Char.chr(Char.code c + 1) in 
             s
      end

let log x= 
  let k s = let s' = "[plugin_qdemo]:" ^ s in
            Lwt.ignore_result (Lwt_log.debug s')
  in
  Printf.ksprintf k x

let maybe_reset_counters user_db qname b1 = 
  let e_key = end_name qname in
  let exists = 
    try let _ = user_db # get e_key in true with Not_found -> false 
  in
  if exists
  then 
    let ev = user_db # get e_key in
    if ev = b1 then
      let b_key = begin_name qname in
      let () = user_db # set b_key zero in
      let () = user_db # set e_key zero in
      ()
    else
      ()
  else ()

let push user_db vo = 
  match vo with
    | None -> invalid_arg "push None"
    | Some v -> 
        let qname, p1 = Llio.string_from v 0 in
        let value, _ = Llio.string_from v p1 in
        let e_key = end_name qname in
        let b0 = 
          try user_db # get (end_name qname) 
          with Not_found -> zero 
        in
        let b1 = next_counter b0 in
        let () = user_db # set (qprefix qname b1) value in
        let () = user_db # set e_key b1 in
        None

let pop user_db vo =
  match vo with 
    | None   -> invalid_arg "pop None"
    | Some qname -> 
        let b_key = begin_name qname in
        let b0 = 
          try user_db # get (begin_name qname) 
          with Not_found -> zero
        in
        let b1 = next_counter b0 in
        try 
          let k = qprefix qname b1 in
          let v = user_db # get k in 
          let () = user_db # set b_key b1 in
          let () = user_db # delete k in
          let () = maybe_reset_counters user_db qname b1 in
          Some v
        with
          Not_found ->
            let e_key = end_name qname in
            let () = user_db # set b_key zero in
            let () = user_db # set e_key zero in
            None
              

let () = Registry.register "QDemo.push" push
let () = Registry.register "QDemo.pop" pop

The last two lines register the functions to the Arakoon cluster when the module is loaded.

Compilation

So how do you deploy your user function module into an Arakoon cluster?
First need to compile your module into something that can be dynamically loaded.
To compile the plugin_qdemo.ml I persuade ocamlbuild like this:

ocamlbuild -use-ocamlfind -tag 'package(arakoon_client)' \
-cflag -thread -lflag -thread \
plugin_qdemo.cmxs

It’s not too difficult to write your own testcase for your functionality, so you can run it outside of Arakoon and concentrate on getting the code right.

Deployment

First, you need put your compilation unit into the Arakoon home directory on all your nodes of the cluster. And second, you need to add the name to the global section of your cluster configuration. Below, I show the configuration file for my simple, single node cluster called ricky.


[global]
cluster = arakoon_0
cluster_id = ricky

### THIS REGISTERS THE USER FUNCTION:
plugins = plugin_qdemo

[arakoon_0]
ip = 127.0.0.1
client_port = 4000
messaging_port = 4010
home = /tmp/arakoon/arakoon_0

All right, that’s it. Just a big warning about user functions here.

Once a user function is installed, it needs to remain available, with the same functionality for as long as user function calls are stored inside the transaction logs, as they need to be re-evaluated when one replays a transaction log to a store (for example when a node crashed, leaving a corrupt database behind). It’s not a bad idea to include a version in the name of a user function to cater for evolution.

Demo

Let’s use it in a simple python script.

def make_client():
    clusterId = 'ricky'
    config = Arakoon.ArakoonClientConfig(clusterId,
                                         {"arakoon_0":("127.0.0.1", 4000)})
    client = Arakoon.ArakoonClient(config)
    return client

if __name__ == '__main__':
    client = make_client()
    q = ArakoonQueue("qdemo", client)
    q.push("bla bla bla")
    q.push("some more bla")
    q.push("3")
    q.push("4")
    q.push("5")
    print q.pop()
    print q.pop()
    print q.pop()
    print q.pop()

with expected results.

Arakoon 2

With Arakoon 2 we moved to Baardskeerder as a database backend, replacing the combination of transaction logs and Tokyo Cabinet. Since the backend is Lwt-aware, this means that the server side API has become too:

module UserDB :
sig
type tx = Core.BS.tx
type k = string
type v = string
val set    : tx -> k -> v -> unit Lwt.t
val get    : tx -> k -> (v, k) Baardskeerder.result Lwt.t
val delete : tx -> k -> (unit, Baardskeerder.k) Baardskeerder.result Lwt.t
end

module Registry:
sig
type f = UserDB.tx -> string option -> (string option) Lwt.t
val register: string -> f -> unit
val lookup: string -> f
end

The major changes are that

  • the api now uses Lwt
  • we have (‘a,’b) Baardskeerder.result types, which we favour over the use of exceptions for normal cases.

Rewriting the queue implementation to Arakoon 2 yields something like:

(* file: plugin_qdemo2.ml *)

open Userdb
open Lwt
open Baardskeerder

let zero = ""
let begin_name qname = qname ^ "/@begin" 
let end_name qname = qname ^ "/@end"
let qprefix qname key = qname ^ "/" ^ key

let next_counter = function
  | "" -> "A"
  | s -> 
      begin
        let length = String.length s in
        let last = length - 1 in
        let c = s.[last] in
        if c = 'H' 
        then s ^ "A"
        else let () = s.[last] <- Char.chr(Char.code c + 1) in 
             s
      end

let reset_counters tx qname =
  let b_key = begin_name qname in
  let e_key = end_name qname in
  UserDB.set tx b_key zero >>= fun () ->
  UserDB.set tx e_key zero


let maybe_reset_counters tx qname (b1:string) = 
  let e_key = end_name qname in
  begin
    UserDB.get tx e_key >>= function
      | OK  _ -> Lwt.return true
      | NOK _ -> Lwt.return false
  end >>= function
    | true ->
        begin
          UserDB.get tx e_key >>= function
            | OK ev ->
                if ev = b1 
                then reset_counters tx qname
                else Lwt.return ()
            | NOK _  -> Lwt.return ()
        end
    | false  -> Lwt.return ()

let push tx vo = 
  match vo with
    | None -> Lwt.fail (invalid_arg "push None")
    | Some v -> 
        let qname, p1 = Llio.string_from v 0 in
        let value, _ = Llio.string_from v p1 in
        Lwt_log.debug_f "push:qname=%S;value=%S" qname value >>= fun ()->
        let e_key = end_name qname in
        UserDB.get tx (end_name qname) >>= fun b0r ->
        let b0 = match b0r with
          | OK b0 -> b0
          | _     -> zero
        in
        let b1 = next_counter b0 in
        UserDB.set tx (qprefix qname b1) value >>= fun () ->
        UserDB.set tx e_key b1 >>= fun () ->
        Lwt.return None



let pop tx = function
  | None   -> Lwt.fail (invalid_arg "pop None")
  | Some qname -> 
      begin
        let b_key = begin_name qname in
        UserDB.get tx (begin_name qname) >>= fun b0r ->
        begin
          match b0r with
            | OK b0 -> Lwt.return b0
            | NOK _ -> Lwt.return zero
        end
        >>= fun b0 ->
        let b1 = next_counter b0 in
        let k = qprefix qname b1 in
        UserDB.get tx k >>= fun vr ->
        begin
          match vr with
            | OK value -> 
                begin
                  UserDB.set tx b_key b1 >>= fun () ->
                  UserDB.delete tx k >>= function 
                    | OK () ->
                        begin
                          maybe_reset_counters tx qname b1 >>= fun () ->
                          Lwt.return (Some value)
                        end
                    | NOK e -> Lwt.fail (Failure e)
                end
             | NOK _  -> 
                 reset_counters tx qname >>= fun () ->
                 Lwt.return None
        end
      end
        
let () = Userdb.Registry.register "QDemo.push" push
let () = Userdb.Registry.register "QDemo.pop" pop

Both client side and deployment remain the same.

Questions asked

Ain’t there something wrong with this Queue?

Yes! Glad you noticed. This queue concept is fundamentally broken. The problem is the pop.
Follow this scenario:

  1. the client calls the QDemo.pop function
  2. the cluster pops the value from the queue and its master sends it to the client.
  3. the client dies before it can read the popped value

Now what? We’ve lost that value. Bloody network, how dare you!

Ok, I admit this was naughty, but it’s a good example of a simple local concept that doesn’t really amount to the same thing when tried in a distributed context. When confronted with this hole, people immediately try to fix this with “Right!, so we need an extra call to …”. To which I note: “But wasn’t this extra call just the thing you were trying to avoid in the first place?”

Why don’t you allow user functions to be written in <INSERT YOUR FAVOURITE LANGUAGE HERE>?

This is a good question, and there are several answers, most of them wrong. For example, anything along the lines of “I don’t like your stinkin’ language” needs to be rejected because a language’s cuteness is irrelevant.

There are several difficulties with the idea of offering user functions to be written in another programming language. For scripting languages like Python, Lua, PHP ,… we can either implement our own interpreter and offer a subset of the language, which is a lot of work with low return on investment, or integrate an existing interpreter/runtime which will probably not play nice with Lwt, or with the OCaml runtime (garbage collector). For compiled languages we might go via the ffi but it’s still way more complex for us. So for now you’re stuck with OCaml for user functions. There are worse languages.

Wouldn’t it be better if you apply the result of the user function to the transaction log iso the arguments?

Well, we’ve been thinking about that a lot before we started with user functions. The alternative is that we record and log the effect of the user function so that we can always replay that effect later, even when the code is no longer available. It’s an intriguing alternative, but it’s not a clear improvement. It all depends on the size of the arguments versus the size of the effect.
Some user functions have a small argument set and a big effect, while for other user functions it’s the other way around.

Closing words

Technically, it’s not too difficult to hook in your own functionality into Arakoon. Just make sure the thing you want to hook in does not have major flaws.

have fun,

Romain.


Rediscovering the RSync Algorithm

A:Ok, you’re synchronizing this over the web;
and what do you use for the synchronization?

B: Oh, we implemented the rsync algorithm.
A: uhu. And what do you do with really big files?
B: The same.
A: And you also synchronise folders?
B: Yes.
A: And how do you do that?
B: we iterate over the folder, using the algorithm on every file, recursing over subfolders.
A: Can you try 2 things for me? First, a very large file; and second, a large codebase, and see if it holds.

Introduction

First of all, I am an admirer of the (original) rsync algorithm. (I think) it was a first stab at file synchronization, and it was so good people still implement it today.
But if you don’t understand its strenghts and weaknesses you might be in for a disappointment.

The Problem

You have 2 files, A’ and A that are different but alike. They reside on 2 different locations connected through a network. You want to make A’ identical to A.

The simplest solution is to just copy A, but given the similarity between the two files, there has to be a better way.

Historical Interlude

Networks used to be notoriously bad in the early 90s. Everybody who was transferring archives over large distances instinctively knew about a critical download size.
If the archive was too large, it would take too long, yielding a 100% chance something would go wrong somewhere resulting in an interrupted download. Even if the (up- or) download succeeded, chances were a small part of the file got corrupted, and you had to start over. The two first alleviations to this problem were checksums to detect accidental corruptions, and resumptions (being able to start a download at a certain offset).

RSync took care of interrupted downloads, and also provided a better solution when your file was corrupt. On top of that, it allowed low cost propagation of small changes, opening up a whole new range of applications. System administrators had a shiny new hammer.

The RSync Strategy

RSync just does a single round trip. First it creates a signature of A’, sends it over. On the other location it scans the local file, tries to find parts that are in the signature, while constructing a recipe as a stream of instructions. It’s possible to derive the algorithm starting from a primitive version, improving it step by step.
Since it’s fun too, I’ll be doing that here. While we’re playing, we’ll abstract away from IO, because it clouds the algorithmical view.

Mark 0

Let’s attack this in pure caveman style. Making a signature is splitting the file in blocks of equal size (except maybe the last). Iterating over the blocks, you calculate a digest and accumulate digests and block identifiers. Block identifiers are just their number: the first block has id 0, the second block id 1 aso.

let file_signature f b_size = 
  let f_len = String.length f in
  let rec loop acc s i =
    if s = f_len 
    then acc
    else 
      let b_len = min b_size (f_len - s) in
      let b = String.sub f s b_len in
      let b_sig = block_signature b in
      let acc' = (b_sig,i) :: acc in
      loop acc' (s+b_len) (i+1)
  in
  loop [] 0 0

We have lots of choice to calculate a block signature. Let’s be lazy and pick Digest.string which is the md5 checksum of the block.

let block_signature block = Digest.string block

To recreate the file you need to interprete the stream of instructions. But what would these instructions be?
Well, in this version, you can be told to copy over a block or write a char.

type instruction = 
  | C  of char
  | B  of int

Ok, how do you combine the signature together with the new file to generate a stream of instructions?
First thing that comes to mind is to scan over the new file, starting at position s

  • consider the block starting at s and try to find it in the signature.
  • if you find it, add a B j instruction, and jump a block forward.
  • if you miss it, add a C c instruction, and step forward 1 position.

Let’s do that:

let updates f0_sig b_size f1 = 
  let f1_len = String.length f1 in
  let rec loop acc s = 
    if s = f1_len 
    then List.rev acc
    else
      let b_len = min b_size (f1_len - s) in
      let block = String.sub f1 s b_len in
      let u,step = 
        try 
          let d = block_signature block in
          let i = List.assoc d f0_sig in 
          (B i), b_len
        with Not_found -> (C block.[0]), 1
      in
      let acc' = u :: acc in
      loop acc' (s + step)
  in
  loop [] 0

The last step in our synchronisation scheme is to create a file using the old file,
together with the stream of instructions.

let apply old b_size ins =
  let old_len = String.length old in
  let buf = Buffer.create old_len in
  let add_block i = 
    let s = i * b_size in
    let b_len = min b_size (old_len - s) in
    let block = String.sub old s b_len in
    Buffer.add_string buf block
  in
  let add_char c = Buffer.add_char buf c in
  let () = List.iter (function 
    | B i -> add_block i
    | C c -> add_char c) 
    ins
  in
  Buffer.contents buf

So it took 60 lines of code to have a first stab at a synchronisation algorithm.
Let’s try this on an example:

let bs = 5
let remote = "aaaaabbbbbcccccdddddeeeeefffffggggghhhhhiiiiijjjjjkkk"
let mine = "aaaaabXbbbcccccddddde012"
let mine_s = file_signature mine bs

let delta = updates mine_s bs remote
let mine2 = apply mine bs delta;;


val bs : int = 5
val remote : string = "aaaaabbbbbcccccdddddeeeeefffffggggghhhhhiiiiijjjjjkkk"
val mine : string = "aaaaabXbbbcccccddddde012"

val mine_s : (Digest.t * int) list =
[("$\240\t\221\19200\222\199\2035\190|\222~#\n", 4);
("P\248M\175:m\253j\159 \201\248\239B\137B", 3);
("g\199b'k\206\208\158\228\22314\2137\209d\234", 2);
("\196\148\"\21926Lm\179V E=\201O\183,", 1);
("YO\128;8\nA9n\214=\2029P5B", 0)]

val delta : instruction list =
[B 0; C 'b'; C 'b'; C 'b'; C 'b'; C 'b'; B 2; B 3; C 'e'; C 'e'; C 'e';
C 'e'; C 'e'; C 'f'; C 'f'; C 'f'; C 'f'; C 'f'; C 'g'; C 'g'; C 'g';
C 'g'; C 'g'; C 'h'; C 'h'; C 'h'; C 'h'; C 'h'; C 'i'; C 'i'; C 'i';
C 'i'; C 'i'; C 'j'; C 'j'; C 'j'; C 'j'; C 'j'; C 'k'; C 'k'; C 'k']
val mine2 : string = "aaaaabbbbbcccccdddddeeeeefffffggggghhhhhiiiiijjjjjkkk"

Ok, it works, but how good is it?
Before I can answer this, first a note on block size. There are some ‘forces’ to be reckoned with

  • the blocksize needs to be big compared to the block signature.
  • if the blocksize is big, you will find less matches between the signature and the new file, so you need send more data back
  • if the blocksize is small, you have lots of them, meaning your signature will be bigger
    and you need an efficient lookup

The best blocksize will be depend not only on the file size, but on the actual changes.
How important is it really?
Well, let’s sync 2 images:

These 2 images are bitmaps of 5.5 MB each (stored as .bmp).
(I actually uploaded smaller versions as it seems useless to let your browser download more than 10MB for just 2 silly image samples)
I’ll sync’em using different blocksizes and see what gives.

let main () =
  let bs = int_of_string (Sys.argv.(1)) in
  let () = Printf.printf "bs=%i\n" bs in
  let remote = get_data "new_image.bmp" in
  let () = Printf.printf "remote: size=%i%!\n" (String.length remote) in
  let mine = get_data "old_image.bmp" in
  let mine_s = X.file_signature mine bs in
  let () = Printf.printf "mine_s: len=%i%!\n" (Array.length mine_s) in
  let delta = X.updates mine_s bs remote in
  let (nbs,ncs) = List.fold_left (fun(bs,cs) i ->
    match i with
      | X.B _ -> (bs+1,cs)
      | X.C _ -> (bs,cs+1)
  ) (0,0) delta 
  in
  let mine2 = X.apply mine bs delta in
  let () = Printf.printf "mine2: size=%i%!\n" (String.length mine2) in
  let () = Printf.printf "bs=%i;cs=%i\n" nbs ncs in

block size # block signatures blocks chars time
8192 704 535 1384448 71s
4096 1407 1084 1323008 92s
2048 2813 2344 960512 92s
1024 5626 4995 646144 115s
512 11251 10309 482304 172s
256 22501 20972 391424 283s
128 45001 42508 319104 537s

The first column is the block size. The second is the number of blocks in the file, the third is the number of B instructions and the fourth is the number of C instructions.
The last columns is measured execution time on my laptop. Processing time is the biggest issue. Ocaml is fast, but not fast enough to compensate for my brutal inefficiency. Imagine what it would do to a 4GB movie.

Mark 1

The problem is quickly revealed: List.assoc is not suited for long lists.
A better solution is use an array, sort it on block signature, and use binary search
(using a hashtable would be viable too).

let block_signature block = Digest.string block

let file_signature f b_size = 
  let f_len = String.length f in
  let s = ref 0 in
  let n_blocks = (f_len + b_size -1) / b_size in
  Array.init n_blocks 
    (fun i -> 
      let start = !s in
      let b_len = if start + b_size > f_len then f_len - start else b_size in
      let b = String.sub f start b_len in
      let b_sig = block_signature b in
      let () = s := start + b_len in
      (b_sig,i)
    ) 

type instruction = 
  | C  of char
  | B  of int

let updates f0_sig b_size f1 = 
  let my_cmp (s0,i0) (s1,i1) = String.compare s0 s1 in
  let () = Array.sort my_cmp f0_sig in
  let len = Array.length f0_sig in
  let rec lookup b= 
    let rec find min max = 
      let mid = (min + max) / 2 in
      let (ms,mi) = f0_sig.(mid) in
      if ms = b 
      then mi
      else if min > max then raise Not_found
      else if b > ms then find (mid+1) max
      else find min (mid -1)
    in
    find 0 (len -1)
  in
  let f1_len = String.length f1 in
  let rec loop acc s = 
    if s = f1_len 
    then List.rev acc
    else
      let b_len = min b_size (f1_len - s) in
      let block = String.sub f1 s b_len in
      let u,step = 
        try 
          let d = block_signature block in
          let i = lookup d in 
          (B i), b_len
        with Not_found -> (C block.[0]), 1
      in
      let acc' = u :: acc in
      loop acc' (s + step)
  in
  loop [] 0

let apply old b_size ins =
  let old_len = String.length old in
  let buf = Buffer.create old_len in
  let add_block i = 
    let s = i * b_size in
    let b_len = min b_size (old_len - s) in
    let block = String.sub old s b_len in
    Buffer.add_string buf block
  in
  let add_char c = Buffer.add_char buf c in
  let () = List.iter (function 
    | B i -> add_block i
    | C c -> add_char c) 
    ins
  in

block size # block signatures blocks chars time(s)
8192 704 535 1384448 41
4096 1407 1084 1323008 20
2048 2813 2344 960512 7.8
1024 5626 4995 646144 1.9
512 11251 10309 482304 1.1
256 22501 20972 391424 0.8
128 45001 42508 319104 0.9

Wow, this is quite unexpected (but we’re not complaining). So what happened? Well, the lookup was so dominating, it was cloaking all other behaviour.
Now, with the lookup out of the way, other things can be observed. The problem now is that a ‘miss’ not only causes a C instruction to be emitted, but more importantly, it causes another digest to be calculated. The less blocks are found, the higher the processing time.

Mark 2

The cost of the miss was solved by Andrew Tridgell by introducing a second, weaker hash function.
He used the Adler-32 checksum which is a rolling checksum. ‘Rolling’ means that
adler32(buffer[a+1:b+1])= cheap(adler32(buffer[a:b]), which makes it suitable for our problem here. The ocaml standard library does not have an adler32 module, so I hacked one up.
It’s a naieve implementation in that it follows the definition closely. In fact, most of the modulo operations can be avoided by doing some extra administration.
I didn’t bother.

module Adler32 = struct
  type t = {mutable a: int; mutable b: int; mutable c: int}
      
  let padler = 65521
    
  let make () = {a = 1 ;b = 0; c = 0}
    
  let from buf offset length = 
    
    let too_far = offset + length in
    let rec loop a b i= 
      if i = too_far 
      then {a; b; c = length}
      else (* modulo can be lifted with a bit of math *)
        let a' = (a + Char.code(buf.[i])) mod padler in
        let b' = (b + a') mod padler in
        loop a' b' (i+1)
    in
    loop 1 0 offset
    
  let reset t = t.a <- 1;t.b <- 0; t.c <- 0
    
  let digest t = (t.b lsl 16) lor t.a 
    
  let out t c1 = 
    let x1 = Char.code c1 in
    t.a <- (t.a - x1) mod padler;
    t.b <- (t.b - t.c  * x1 -1) mod padler;
    t.c <- t.c - 1

  let rotate t c1 cn = 
    let up x = if x >= 0 then x else x + padler in
    let x1 = Char.code c1 in
    let xn = Char.code cn in
    t.a <- up ((t.a - x1 + xn) mod padler);
    t.b <- let f = (t.c * x1) mod padler in
           let r = (t.b - f + t.a -1) mod padler in 
           up r
             
  let update t buf offset length = 
    let too_far = offset + length in 
    let rec loop i = 
      if i = too_far then () 
      else
        let x = Char.code buf.[i] in
        let () = t.a <- (t.a + x) mod padler  in
        let () = t.b <- (t.b + t.a) mod padler in
        let () = t.c <- t.c + 1 in
        loop (i +1)
    in
    loop offset
      
  let string block = 
    let t = from block 0 (String.length block) in
    digest t
end

Adler32 is much weaker than md5 and using it exposes you to collisions. So in fact, it acts as a cheap test that might yield false positives. That’s the reason the rsync algo keeps both: if the adler32 of the buffer is found in the signature, there is a second check to see if the md5s match. The fact one sometimes needs to rotate the checksum and at other times needs to reinitialize if from a part of the buffer, complicates the calculation of the updates a bit.

The code is shown below.

let updates f0_sig b_size f1 = 
  let my_cmp (wh0,sh0,i0) (wh1, sh1,i1) = compare wh0 wh1 in
  let () = Array.sort my_cmp f0_sig in
  let len = Array.length f0_sig in
  let rec lookup w = 
    let rec find min max = 
      let mid = (min + max) / 2 in
      let (mw, ms,mi) = f0_sig.(mid) in
      if mw = w
      then (ms,mi)
      else if min > max then raise Not_found
      else if w > mw then find (mid+1) max
      else find min (mid -1)
    in
    find 0 (len -1)
  in
  let f1_len = String.length f1 in
  let weak = Adler32.from f1 0 b_size in
  let rec loop acc b e = 
    if b = f1_len 
    then List.rev acc
    else
      let wh = Adler32.digest weak in
      let step_1 () = 
        let bc = f1.[b] in
        let a = C bc in
        let b' = b + 1 in
        if e +1 < f1_len 
        then 
          let e' = e + 1 in
          let ec = f1.[e] in
          let () = Adler32.rotate weak bc ec in
          loop (a:: acc) b' e'
        else
          let e' = e in
          let () = Adler32.out weak bc in
          loop (a:: acc) b' e'
      in
      try
        let (s0,i0) = lookup wh in
        let sh = Digest.substring f1 b (e - b) in
        if s0 = sh 
        then
          let b' = e in
          let e' = min f1_len (e + b_size) in
          let a = B i0 in
          let () = Adler32.reset weak in
          let () = Adler32.update weak f1 b' (e' - b') in
          loop (a :: acc) b' e'
        else step_1 ()
      with Not_found -> step_1 ()
  in
  loop [] 0 b_size

The code indeed is a bit messier as we have more things to control at the same time, but it’s still quite small. Let’s see how wel it performs:

block size # block signatures blocks chars time(s)
8192 704 535 1384448 0.9
4096 1407 1084 1332008 0.9
2048 2813 2344 960512 0.8
1024 5626 4995 646114 0.6
512 11251 10309 482304 0.6
256 22501 20401 537600 0.7
128 45001 42508 319104 0.7

This almost completely removes the cost of a miss; at least for things of this size. The choice of blocksize does however affect the amount of data you need to send over.
There is a lot of other things you can do here:

  • Block Size Heuristic: something like O(sqrt(file_size))
  • SuperInstructions: make instructions for consecutive Blocks, and consecutive Chars
  • Emit function: don’t accumulate the updates, but emit them (using a callback function)
  • Smaller signatures: you can consider to drop some bytes from the strong hash
  • IO
  • Compress update stream

The last remaining problem is that some modifications are not handled well by the algorithm (for example 1 byte changed in each block).
Maybe you could try a better algorithm.
There are lot’s of them out there, and they are worth checking out. (Before you know it you’ll be dabling into merkle trees or set reconciliation )

Anyway, I already exceeded my quotum for this post, but I still want to say a little thing about synchronisation of folders

The Next Problem

You have 2 trees of files, A’ and A that are different but alike. They reside on 2 different locations connected through a network. You want to make A’ identical to A.

What Not To Do

Don’t walk the folder and ‘rsync’ each file you encounter.
A small calculation will show you how bad it really is.
Suppose you have 20000 files, each 1KB. Suppose 1 rsync costs you about 0.1s
(reading the file, sending over the signature, building the stream of updates, applying them).
This costs you about 2000s or more than half an hour. System administrators know better:
they would not hesitate: “tar the tree, sync the tars, and untar the synced tar”.
Suppose each of the actions takes 5s (overestimating) you’re still synced in 15s.
Or maybe you can try unison. It’s ocaml too, you know.

have fun,

Romain.


Cloud Search Using Suffix Arrays ? Well, … maybe.

Suffix Arrays are arrays that allow you to find exact substring matches. The core idea is that you generate a sorted array of positions using a comparison function that compares the suffixes starting at the respective positions.

Constructing a suffix array

It’s one of the cases where a few lines of code is more clarifying than the explanation itself.

def make_suffix_array(text):
    size = len(text)
    def compare_suffix(i,j):
        return cmp(text[i:],text[j:])
    
    indexes = range(size)
    indexes.sort(cmp = compare_suffix)
    return indexes

If you try this on the string “Suffix Arrays Rock?” and print out the sorted array’s indexes along with the suffixes that start there, you start to see both potential, as well as weaknesses.

pos suffix
06 ‘ Arrays Rock?’
13 ‘ Rock?’
18 ‘?’
07 ‘Arrays Rock?’
14 ‘Rock?’
00 ‘Suffix Arrays Rock?’
10 ‘ays Rock?’
16 ‘ck?’
02 ‘ffix Arrays Rock?’
03 ‘fix Arrays Rock?’
04 ‘ix Arrays Rock?’
17 ‘k?’
15 ‘ock?’
09 ‘rays Rock?’
08 ‘rrays Rock?’
12 ‘s Rock?’
01 ‘uffix Arrays Rock?’
05 ‘x Arrays Rock?’
11 ‘ys Rock?’

Note: This method of creating suffix arrays is criminally inefficient, but it suffices for explaining the concept. There are algorithms to do this in linear time.

You can use the suffix array to search for a string in the text using binary search. Something like this:

def find(sa, text, q):
    size = len(sa)
    qsize = len(q)
    hi = size -1
    lo = 0
    while hi >= lo:
        mid = (hi + lo) / 2
        begin = sa[mid]
        end = min(size, begin + qsize)
        test = text[begin: end]
        if test > q:
            hi = mid -1
        elif test < q:
            lo = mid + 1
        else:
            return begin
    return None

You have a fast search that allows you to return the exact position of the substring.
Even better: all matches are clustered together in the suffix array. Perfect locality.

Multiple documents

The concept easily extendeds itself to multiple documents.

def make_multi(texts):
    indexes = []
    nt = len(texts)
    for d in xrange(nt): 
        size = len(texts[d])
        for i in xrange(size):
            e = d,i
            indexes.append(e)

    def compare(e0,e1):
        d0, i0 = e0
        d1, i1 = e1
        s0 = texts[d0][i0:]
        s1 = texts[d1][i1:]
        return cmp(s0,s1)

    indexes.sort(cmp = compare)
    return indexes

A minimal example with 3 very small texts, and a bit of code to dump the suffix array, together with the suffixes.

def print_multi(sam, texts):
    for e in sam:
        d, i = e
        suffix = texts[d][i:]
        print "(%2i,%2i)\t%s" % (d,i,suffix)

texts = ["Suffix Arrays Rock?",
         "Redundant Array of Inexpensive Disks",
         "Metal is not Rock!"]
r0 = make_multi(texts)
print_multi(r0, texts)

yields:

( 1, 9)	 Array of Inexpensive Disks
( 0, 6)	 Arrays Rock?
( 1,30)	 Disks
( 1,18)	 Inexpensive Disks
( 2,12)	 Rock!
( 0,13)	 Rock?
( 2, 5)	 is not Rock!
( 2, 8)	 not Rock!
( 1,15)	 of Inexpensive Disks
...
( 1,28)	ve Disks
( 0, 5)	x Arrays Rock?
( 1,22)	xpensive Disks
( 1,14)	y of Inexpensive Disks
( 0,11)	ys Rock?

Ok, this seems to have almost perfect scalability properties. What’s the catch?

The catch

There are some severe downsides to Suffix Arrays.

  • Text needed during search
    A major disadvantage to suffix arrays is that you need access to the text while you do a search. It means that the suffix array and the text need to be kept close to each other.
  • Storage Needs
    If you think of it, a text is an array of chars (ascii, not unicode). Its suffix array is an array of positions. Suppose you store a position and document ids as a 32bit words. Remember you need to store the text, as well as the suffix array. If the text size is n, the total needs are 9n. To make matters worse: since you’ll be jumping around in the suffix array, as well as the text, compression schemes are not really feasible.
  • Updates?
    Even the slightest update to the text will invalidate the suffix array, and you need to reconstruct it. Even worse, if you have an array over multiple documents, an update in one of the documents will invalidate the entire structure.
  • Exact Matches
    You get what you asked for. Searching the example text for ‘suffix’ will not yield a result.
  • Overkill
    You can search for an exact match on a string of arbitrary length, but nobody needs to find exact matches on large strings. There is one notable exception: If you’re trying to look up DNA fragments in a large DNA string, this is exactly what you need. For everybody else, it’s overkill.

Getting better all the time

Well, it couldn’t get much worse now could it?
It doesn’t need to be this bad, and you can actually pick how much pain you want to inflict on yourself.

  • search starts at start of words
    Most of the time, it’s not needed to be able to search for substrings of words. For example, you probably don’t need to find the ‘rays’ in ‘arrays’. It divides the number of positions to sort by the average word size.
  • remove language noise
  • Not all words need to be considered. In fact, the most frequent words are noise.

  • old versions can be blacklisted
    If all versions of all your documents have a unique id, you can blacklist older unwanted document ids, and filter them out before you return the results. This means you don’t have to do updates immediately, but you can schedule them when it’s cheapest for you.
  • text size is not document size
    Suppose your documents are pdfs. The actual size of the text is small compared to all the other information in the file (markup, images, fonts, …). Moreover, storing text will not eat your storage. For example, a heavyweight (in every way) novel like War And Peace takes about 3.2 MB. The entire corpus of english wikipedia articles, which is stored as xml is just over 30 GB. peanuts.
  • multiple suffix arrays
    You don’t need add all your documents to one big suffix array. You can have many of them wich improves the situation in more than one way. Search can be parallel, and adding a new document is cheaper as well.

What does this have to do with ‘The Cloud’ ?

Since there is a network between the client issuing the request and the cloud infrastructure offering the search service, means you have a non-neglectable latency.
This means the latency added by the service just needs to be small compared to the overall latency, which makes our life a bit easier.

Also, the nature of ‘The Cloud’ pushes people to store consolidated instead of live data, which is exactly what is best suited for suffix arrays.
Another prerequisite for suffix arrays is the availability of the text during search, which is exactly what happens in a cloud setup.

Closing words

Suffix arrays are an idea that was conceived back when ‘online’ still had other semantics. At the time (1991) they were seen as too expensive for general search, but that was back when ‘big hard disks’ had less than 100MB of capacity. Maybe Suffix Arrays deserve a second look.

I can feel the urge to whip up an email search/indexing service in a few hundred lines of code, or a private wikipedia search service.
Just one question remains: what shall it be? ocaml or haskell? hm, tough one.

have fun,

Romain.