Caulking your distributed algorithm implementation

Me: Ok, all unit tests succeed.
All system tests succeed.
All acceptance tests succeed.
Release!
(some time later)
NN: We have a problem with your distributed database.
Me: Ok, what’s the matter?
NN: Once in a while the cluster seems to get stuck.
Me: Stuck how?
NN: It seems to be unable to elect a master.
Me: What did you do to get there?
NN: We don’t know, but we have the logs. Here they are. Happy hunting.
Me: $#%#&*!
(some time later)
Me: Aha, that’s what went wrong!

Introduction

The Arakoon team has spent a big part of the last two years on variations of this theme. Some of the problems turned out to configuration problems (sometimes even of other clusters), but there definitely were situations where the issue was a caused by an implementation error. As it turns out, they all have the same cause: not all possible scenarios were covered. So, is it possible to escape this vicious circle?

We already learned some things (see this previous blog post), so we use asynchronuous message passing and removed all IO from our implementation. So what’s left?
Well we have a set of distributed objects with well defined states and a set of messages that can be exchanged between them. Can’t we just generate all possible states of the distributed system and all possible messages and check how the relationships between them? This is non-trivial as the number of possible states is infinite and so is the number of possible messages. On the other hand, most state combinations are bad and inconsistent, and most messages are irrelevant… There might be something we can do.

Growing a state space-ish diagram

What if we start from a consistent starting state for all objects (let’s call this the system state), generate all relevant messages that can be exchanged from that state and apply them in all possible orders. The system states that can be reached in this way from the starting state should all be consistent. If we find a state that’s not consistent, we stop. For consistent system states, we can iterate. What about inconsistent states? Well, clearly this means our algorithm is capable of doing bad things. We should check the scenario that produced this and fix it, and iterate again. Is this doable? Well, maybe… and what about simulating dropped messages?

Humble beginnings

Let’s start small. What about growing a Basic Paxos implementation for a system of three nodes? Modelling the messages should not be too difficult:

type 'v kind = 
  | Prepare   
  | Promise of 'v option
  | Accept  of 'v
  | Accepted  

type 'v message = {n:n; s:id; t:id; k:'v kind}


Each message is associated with a paxos round n, has a source s and a target t and has semantics described by its kind k. Finally there’s some value type (‘v) for the things the system should try to find consensus on. (You can find the code on Github)

Modelling the agents is a bit more work:

type 'v agent_state = 
  | SHalted 
  | SIdle 
  | SRequest  of ('v proposal)
  | SPromised of 'v option
  | SLead of ('v * int) (* value, outstanding acks *)


type 'v agent = { 
  id : id;
  pop : id list;
  _n : n;
  state : 'v agent_state;
  store : (n * 'v) list;
}

An agent has an id, knows the other agents (pop), has a store and a current paxos round n. The interesting part is the inner state representing the role it’s playing. It can be halted or idle, requesting to become a leader for a proposal or leading an update.

Now we have messages and agents, we can model the state of the system.

type 'v network = 'v message list
type 'v agents  = 'v agent list
type 'v state   = { net: 'v network; ags: 'v agents}


The system state is merely a collection of agents (ags) and a network (net) representing the messages that can be delivered to the agents.

How can the state of the system change? First of all, the delivery of a message most likely will have an impact. We might add other moves later.

type 'v move = 
  | DeliverMsg of 'v message

Generating all moves is now easy: for every message in the network, there is a move that delivers it, and since we want to stop in bad states, we don’t generate messages there:

let generate_moves state = 
  if is_bad state 
  then []
  else
    let deliver = List.map (fun x -> DeliverMsg x) state.net in
    ...

How about executing a move? There is some administration to do there.
We have a move to execute, a current state, a path that was followed to arrive at that state, and a set of observed states. If the move is the delivery of a message, we find the target agent and let him handle the message. This will change the agent’s state and produce some messages (extra).

 let execute_move move state path observed = 

    let deliver m ag = 
      let agent = find_agent ag m.t in
      let agent', extra = A.handle_message agent m in
      let ag' = replace_agent agent' ag in
      ag', extra
    in
    
    let state' = 
      match move with
        | DeliverMsg m -> 
          let net' = remove m state.net in
          let ags',extra = deliver m state.ags in
          { net = net' @ extra; ags = ags'}

Executing a move will cause a new system state. We will record observing the transition from the old state to the new by this move, and create the new path.

      let transition = (state_label state, move, state_label state') in
      TSet.add transition observed in
    state' , (move,state) :: path , observed'

The whole simulator is a few functions away:

  let rec check_all level state path observed = 
    if not (is_consistent state) then raise (Bad (path,state));
    if level = limit
    then observed
    else
      let rec loop observed = function
        | [] -> observed
        | move :: rest ->
          let state', path', observed' = execute_move move state path observed in
          let observed'' = check_all (level + 1) state' path' observed' in
          loop observed'' rest
      in
      let moves = generate_moves state in
      loop observed moves

  let run state0 = check_all 0 state0 [] TSet.empty 

Basically we start from an initial state, go down the tree of possible moves, execute all these and accumulate observed transitions.

Generating a diagram is trivial with Graphviz. Just iterate over the observed transitions. (not shown here, see on Github for details )

The simulation

We create 3 agents, let the first one start on a value, run our simulator from the starting state, and dottify the observed transitions.

let main () = 
  let ids = List.map id [1;2;3] in
  let a1,a1_out = start (make_agent (Id 1) ids) "x" in
  let a2,a2_out = (make_agent (Id 2) ids),[] in
  let a3,a3_out = (make_agent (Id 3) ids),[] in
  let world = [a1;a2;a3] in
  let state0 = {net = a1_out @ a2_out @ a3_out; ags = world} in
  try
    let observed = M.run state0 in
    M.dottify state0 observed 
  with (M.Bad (path, state)) ->
    Printf.eprintf "bad path:\n";
    M.dump_path path;
    Printf.eprintf "%s\n" (state_label state)


Mark0

Let’s try this on a brain-dead simple implementation of an agent.
One that goes to the halted state as soon as it receives a message, while sending out no message at all.

module Mark0 = (struct
  let handle_message agent message = halt agent, []

end : ALGO)


What do we see here? First, there is a labelling scheme for states: R1I0I0 means the first agent has n=1 and is in a Requesting state, while the second and third agents ar in Idle state with n=0.
After the delivery of the {Prepare;1->2;n=1} message, a prepare from agent 1 to agent 2, the second agent halts. Likewise for the other prepare message. This looks ok, so let’s move on.

Mark1

Let’s build an agent implementation that covers the happy path.

module Mark1 = (struct

  let prepare_when_idle source n agent= 
    let an = agent._n in
    if n > an 
    then
      let pv = None in
      let agent' = {agent with state = SPromised pv; _n = n;} in
      let msg = {n = n;s = agent.id;t = source; k = Promise pv } in
      let out = [msg] in
      agent', out
    else
      halt agent,[]
    
  let promise_for_request (source:id) (mn:n) vo (proposal:'v proposal) agent = 
    let pv,pballot = proposal in
    if mn = agent._n
    then
      let pballot' = Ballot.register_vote pballot source vo in
      if Ballot.have_majority pballot' 
      then
        let value = Ballot.pick_value pballot' in
        let outstanding_acks = Ballot.quorum pballot' -1 in
        let me = agent.id in
        let targets = others agent in
        let make_msg t = {n = mn; s = me; t ; k =  Accept value} in
        let broadcast = List.map make_msg targets in
        let agent' = { agent with 
            store = (mn,value) :: agent.store;
            state = SLead (value, outstanding_acks);
        }
        in
        agent', broadcast
      else
        agent, []
    else
      halt agent, []
        
  let handle_accept m v agent = 
    match agent.state with
      | SPromised vo when agent._n = m.n -> 
        let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in
        let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in
        agent', out
      | _ -> halt agent, []
    
      
  let handle_accepted m agent =
    match agent.state with
      | SLead (v,out) when agent._n = m.n -> 
        let out' = out -1 in
        let state' = if out' = 0 then SIdle else SLead(v,out') in
        {agent with state = state'},[]          
      | _ -> halt agent, []


  let handle_message agent m = 
    match m.k with
      | Prepare when agent.state = SIdle -> prepare_when_idle m.s m.n agent
      | Promise vo -> 
        begin
          match agent.state with 
            | SRequest p -> promise_for_request m.s m.n vo p agent
            | _ -> halt agent, []
        end
      | Accept v -> handle_accept m v agent
      | Accepted -> handle_accepted m agent
      | _ -> halt agent,[]
end : ALGO)

What does this do? (click on it to see the full picture)


The good news is that there are quite a number of paths from the initial state I0I0I0 that reach our happy state I1I1I1, but there are also a lot of scenarios that end up in bad states.
Let’s look at one in detail.

R1I0I0:{Prepare;1->3;n=1} --->
R1I0P1:{Promise;3->1;n=1} --->
L1I0P1:{Accept; 1->2;n=1} --->
L1H0P1

What happened here? A Prepare message goes from agent 1 to agent 3. That agent sends a Promise back.
This causes agent 1 to become a leader and broadcast Accept messages. One of these reaches agent 1, which is clueless as it did not receive a Prepare message first. Agent 1 therefore halts.

The diagram allows us to understand scenarios that lead to bad states, and to modify the algorithm accordingly. This process of finding holes in your algorithm,patching them and iterating is something which I call caulking in absence of a better word. In this particular case, an agent that is Idle can receive an Accept for the next n and should be able to move to the Idle state at the next n.

What about dropped messages?

Earlier, I did not answer the question about the simulation of dropped messages. The above scenario should make clear that we are actually, in luck. There is no difference between that scenario and a scenario where a Prepare from agent 1 and agent 2 was dropped. In general, there is no difference between dropping a message and delaying it until it is no longer relevant. This means there is no need for us to simulate them at all!

Mark2

Let’s caulk Mark1. Looking at the diagram, not a lot of things need to be fixed. Here’s a list of messages that go awry.

  • Accept;n when agent is Idle at pred n
  • Accepted;n when agent is already Idle at n
  • Promise;n when agent is already Leading at n
  • Promise;n when agent is already Idle at n

Ok, adapting the code is easy:

module Mark2 = (struct

  let prepare_when_idle source n agent= 
    let an = agent._n in
    if n > an 
    then
      let pv = None in
      let agent' = {agent with state = SPromised pv; _n = n;} in
      let msg = {n = n;s = agent.id;t = source; k = Promise pv } in
      let out = [msg] in
      agent', out
    else
      halt agent,[]
    
  let promise_for_request (source:id) (mn:n) vo (proposal:'v proposal) agent = 
    let pv,pballot = proposal in
    if mn = agent._n
    then
      let pballot' = Ballot.register_vote pballot source vo in
      if Ballot.have_majority pballot' 
      then
        let value = Ballot.pick_value pballot' in
        let outstanding_acks = Ballot.quorum pballot' -1 in
        let me = agent.id in
        let targets = others agent in
        let make_msg t = {n = mn; s = me; t ; k =  Accept value} in
        let broadcast = List.map make_msg targets in
        let agent' = { agent with 
            store = (mn,value) :: agent.store;
            state = SLead (value, outstanding_acks);
        }
        in
        agent', broadcast
      else
        agent, []
    else
      halt agent, []
        
  let handle_accept m v agent = 
    let _accept m =         
      let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in
      let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in
      agent', out
    in
    match agent.state with
      | SPromised vo when agent._n = m.n -> _accept m
      | SIdle when (next agent._n) = m.n -> _accept m
      | _ -> halt agent, []
    
      
  let handle_accepted m agent =
    match agent.state with
      | SLead (v,out) when agent._n = m.n -> 
        let out' = out -1 in
        let state' = if out' = 0 then SIdle else SLead(v,out') in
        {agent with state = state'},[]          
      | SIdle when agent._n = m.n -> agent,[]
      | _ -> halt agent, []


  let handle_message agent m = 
    match m.k with
      | Prepare when agent.state = SIdle -> prepare_when_idle m.s m.n agent
      | Promise vo -> 
        begin
          match agent.state with 
            | SRequest p -> promise_for_request m.s m.n vo p agent
            | SLead(v,out) when agent._n = m.n -> agent, []
            | SIdle when agent._n = m.n -> agent, []
            | _ -> halt agent, []
        end
      | Accept v -> handle_accept m v agent
      | Accepted -> handle_accepted m agent
      | _ -> halt agent,[]
end : ALGO)

Look at the output diagram:

Isn’t it nice that fixing the holes in our algorithm actually makes the diagram smaller? Since we don’t end up in bad states anymore, there are way less transitions. It’s also aesthetically pleasing graphviz shows all arrows from left to right, meaning there are no transitions that actually increase the distance between the current state and the state we’re aiming for.

What about agents that are wiped clean?

This kind of calamity is not too difficult to simulate. Basically it’s a move that puts the agent back in its starting state. Let’s add the possibility that one of the agents is wiped.


let generate_moves state = 
  if is_bad state 
  then []
  else
    let deliver = List.map (fun x -> DeliverMsg x) state.net in
    let id3 = Id 3 in
    let agent = find_agent state.ags id3 in
    let wipe = 
      if is_halted agent 
      then [] 
      else [Wipe id3]
    in
    deliver @ wipe 

Let’s try that…

./paxos.byte > mark3.dot
bad path:
1 ---(1: Prepare) ---> 3
3 ---(1: Promise) ---> 1
1 ---(1:  Accept) ---> 2
1 ---(1:  Accept) ---> 3
Wipe 3
L1I0I0

Auch. There actually is something wrong here. As it turns out, there is a bug in the Mark2 module.
It’s this fragment that’s wrong:

 let handle_accept m v agent = 
    let _accept m =         
      let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in
      let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in
      agent', out
    in
    match agent.state with
      | SPromised vo when agent._n = m.n -> _accept m
      | SIdle when (next agent._n) = m.n -> _accept m
      | _ -> halt agent, []

Handling an Accept when the agent is in the Idle state should also set the n correctly (the one of the message). Let’s fix that and try again.
Here it is:

Again, we’re confronted with lots of bad states but we know how to fix that. What are the scenarios that bring us in bad states? As it turns out, these are all caused by old Prepare messages. Adding that and generating the diagram again yields:

Indeed, wiping an agent moves to a state somewhere to the left of the current state, which matches the idea of being further away from our goal.

Are we there yet?

So far, we only addressed cases where there is only 1 agent in a requesting state. So what would happen if there are two agents requesting something at the same time?
Happy caulking!

Closing Remarks

Arriving at a correct implementation of an algorithm is difficult, but even more so in the case of distributed systems. This blog post shows a strategy you can apply in caulking your own implementations. As stated before, making your implementation pure helps you a lot.

Have fun,

Romain.

Advertisements

What are you playing with today?

introduction

A colleague of mine recently asked me how I do research, and where the ideas come from.
The conversation went something like this:

NN: yeah, but how do you learn all these things?
ME: It’s a side-effect, really. Let me explain: when in life do you learn the most?
NN: It must be childhood…
ME: Yes, and why is that?
NN: The brain is more flexible when you’re young…
ME: Yes, that helps considerably, but that’s not the full answer.
The thing that especially helps is that you were playing.
NN: So, you’re telling me I should play more?
ME: Yes, but in order to get away with it, you call it research.

Currently, I’m playing with doing research about persistent data structures.
Literature uses a number of different names (CoW, append-only, log-structured, persistent, …) where updates (at least conceptually)
never change anything which means you have access to all previous versions.
This is associated with a lot of buzz-words like instant snapshotting, cheap transactions, ssd optimized, hot copy, …

Append-only binary tree

To illustrate the concept, I’ll show you how to play with an append-only binary-ish tree (a B-tree would just be more work, without additional fun).

binary tree variation

Lets start off with some pictures of the data structure we want to implement.
The append-only representation will be shown in the next section.
Initially, the tree is empty, and there is nothing to show, but after adding the first key value pair (“f”,”F”), which just means that “f” is mapped to “F”,
the tree looks like this:



A key node (shown as an oval, and called a leaf) just points to a value node (shown as a box).

This is the tree after we insert a second mapping (“d” maps to “D”):



A branch node (shown as a trapezium) splits the key space in 2 parts and everything smaller than or equal to its separator (“d”) dangles left, everything else, dangles right.

After adding the mappings (“h”,”H”) (“a”,”A”) and (z”,”Z”) we have the following tree:


append-only representation

The name append-only stems (I think) from the file based view, where only write at the end of the file (append).
It is also called a log.
For the one element tree, this is simple, once you realize that since “f” points to “F” you need to write value “F” first before you write the leaf “f”:



The number at the left of an entry denotes its position in the log, so later nodes can refer to it (the 0 in the leaf “f” points to the value at position 0).
Adding stuff can only be done at the right hand side, and things can only point to things that have already been written.
Inserting (“d”,”D”) means first writing the value “D”, then writing the leaf “d”, and finally the branch node.
The result is shown below:



Ok, adding (“h”,”H”),(“a”,”A”) and (“z”,”Z”) yields the following: (click to enlarge).



You immediately see that the space overhead can be considerate.
At some point in time, you don’t care anymore about older snapshots and you will want to have the possibility to reclaim that space.

minimal implementation

We still don’t have anything to play with. Let’s do a minimal implementation.
First start with a log.

type v = string
type k = string
type pos = int

type entry = 
  | NIL 
  | V of v
  | L of k * pos
  | N of pos * k * pos 

type log = {es:entry array;  mutable next : int;}

Entries are either nil, values, leafs or nodes. Btw, this is ocaml.
A log is an array, combined with an attribute next that tells you where next to ‘write’.
Some helper functions are needed.

let make cap = {es = Array.make cap NIL; next = 0;}
let root_pos log = log.next -1
let get_entry log pos = if pos = -1 then NIL else log.es.(pos)
let get_length log = Array.length log.es
let free log = get_length log - log.next
  
let write log es =
  let do_one e = 
    if free log = 0 then failwith "full";
    let p = log.next in
    log.es.(p) <- e;
    log.next <- p + 1 
  in
  List.iter do_one es

Retreiving a value is straight forward, just descend starting from the root.

let get log k = 
  let rec find_pos pos = 
    match get_entry log pos with
      | V v -> v
      | L (k0,p0) when k = k0 -> find_pos p0 
      | N (l,k0,r) -> let p' = if k <= k0 then l else r  in
		      find_pos p'			
      | _ -> failwith "Not_found"
  in
  find_pos (root_pos log)

Adding a mapping isn’t that hard. The trick is to record the places you visited on the way down, these are exactly the entries that will need to be rewritten.

type dir = Hit | Left | Right
type trail = (dir * entry * pos) list

exception Todo of trail * string
let todo v msg = raise (Todo (v,msg))

After introducing the direction and the trail I added an exception for cases not handled.
The skeleton for the ‘set’ is rather straight forward:

let set log k v =
  let rec descend trail pos = 
    match get_entry log pos with
      | NIL -> trail
      | V v -> failwith "corrupt"
      | L (k0,p0) as e -> let dir = 
			    if k = k0 
			    then Hit else if k < k0 
			      then Left 
			      else Right 
			  in
			  (dir , e, pos) :: trail
      | N (l,k0,r) as e when k <= k0 -> descend ((Left,e, pos) :: trail) l
      | N (l,k0,r) as e              -> descend ((Right,e,pos) :: trail) r
  in
  let trail = descend [] (root_pos log) in
  let update = build_set k v log.next trail in
  write log update

So first we go down the tree accumulating a trail, which we use to build the update that gets written to the log.
Let’s build the update:

let rec build_set k v start (visited:trail)  = V v :: do_start start k visited 
and do_start start k visited = match visited with
  | [] -> [L (k, start) ]
  | (dir , L(k0,p0),pe) :: rest ->
    begin
      let e = L(k,start) in
      match dir with
	| Hit   -> e :: do_rest (start + 1) rest
	| Left  -> e :: N(start + 1, k, pe) :: do_rest (start + 2) rest
	| Right -> e :: N(pe, k0, start +1) :: do_rest (start + 2) rest
    end
  | _ -> todo visited (Printf.sprintf "do_start %i '%s'" start k)
and do_rest start visited = 
  match visited with
    | [] -> []
    | h :: t ->
      let e = 
	match h with
	  | (Left , N(pl,k0,pr),pe)  ->  N(start,k0,pr) 
	  | (Right, N(pl,k0,pr),pe)  ->  N(pl,k0,start)
	  | _ -> todo visited (Printf.sprintf "do_rest %i" start)
      in
      e :: do_rest (start + 1) t

Creating the update is done in three steps, where do_rest is the most difficult.
The key insight is that if on the way down, you took the left branch, that part of the node needs to change, the other part can be copied.

interactivity, please

The piece of code below allows you to dump a log.

let dump log = 
  Array.iteri (fun i e->
    let () = Printf.printf "%2i: " i in
    match e with
      | NIL        -> print_newline()
      | V v        -> Printf.printf "V \"%s\"\n" v
      | L (k,p)    -> Printf.printf "L(\"%s\",%i)\n" k p 
      | N (l,k0,r) -> Printf.printf "N(%i,\"%s\",%i)\n" l k0 r
  ) log.es

an example dump is this:


 0: V "F"
 1: L("f",0)
 2: V "D"
 3: L("d",2)
 4: N(3,"d",1)
 5: V "H"
 6: L("h",5)
 7: N(1,"f",6)
 8: N(3,"d",7)
 9: V "A"
10: L("a",9)
11: N(10,"a",3)
12: N(11,"d",7)
13: V "Z"
14: L("z",13)
15: N(6,"h",14)
16: N(1,"f",15)
17: N(11,"d",16)
18: 

But examining this to see if all references are correct is rather tedious.
With a few lines of code you can generate output that can be processed by graphviz to generate images that allow visual inspection.

let dot_log ?(f = stdout) log = 
  Printf.fprintf f "digraph Log{\n";
  Printf.fprintf f "\trankdir=\"RL\";\n";
  Printf.fprintf f "\tnode [shape=record];\n";
  let () = Array.iteri (fun i e ->
    let () = match e with
      | NIL -> ()
      | V v     -> 
	Printf.fprintf f "\tnode%i [label = \"{%i|%s}\"];\n" i i v
      | L (k,p) -> 
	Printf.fprintf f 
	  "\tnode%i [label = \"{%i | { %s | <f1> %i} }\"];\n" 
	  i i k p;
	Printf.fprintf f "\tnode%i:<f1> -> node%i;\n" i p
      | N(l,k0,r)  -> Printf.fprintf f "\tnode%i [label = \"{%i| { <f1> %i | %s | <f2> %i}}\"];\n" i i l k0 r;
	Printf.fprintf f "\tnode%i:<f1> -> node%i;\n" i l;
	Printf.fprintf f "\tnode%i:<f2> -> node%i;\n" i r;
    in
    if e <> NIL && i > 0 then Printf.fprintf f "\tnode%i -> node%i [style = invis];\n" i (i-1)
  ) log.es in
  Printf.fprintf f "}"

This needed a bit of experimentation, but the record feature helps a lot.
To assure things are rendered in the correct left-to-right order, every entry has an invisible arrow to its predecessor.
The last fragment glues graphviz and evince together to have the kind of interactive visualization you appreciate from mathlab or scipy.

let view ?(v=dot_log) log = 
  let root = "test" in
  let dot = Filename.temp_file root ".dot" in
  let png = Filename.temp_file root ".png" in
  let oc = open_out dot in
  let () = v ~f:oc log in
  close_out oc;
  let convert_cmd = Printf.sprintf "dot -Tpng -o %s %s" png dot in
  let _ = Sys.command convert_cmd in
  let cmd = Printf.sprintf "evince %s" png in
  Sys.command cmd

let view_log  log = view ~v:dot_log log

Ok, this calls for a screenshot:


closing remarks

The essence is that I build me a nifty toy to gain insight in persistent data structures in less than 150 lines of code,
which is a tribute to the power of ocaml.
All that may be true, but the code isn’t that minimal, and not very pretty:

  • build_set can be made tail recursive and shorter as well.
  • The mutual recursive functions are not needed neither if they are defined in the right order.
  • leaf nodes can be suppressed.
  • dot_log feels hackish.

Most of the time in writing the code was actually spent trying to seduce graphviz in doing what I wanted.

Have fun,

Romain.

Post mortem

This was my first blog post ever.
I should probably have done something shorter as I’m still learning the wordpress web interface.