Arakoon 1.6.0 is out
Posted: July 8, 2013 Filed under: Arakoon | Tags: arakoon, distributed systems, optimization, paxos 2 CommentsH: One voter, 16,472 votes — a slight anomaly…?
E: Not really, Mr. Hanna. You see, Baldrick may look like a monkey who’s
been put in a suit and then strategically shaved, but he is a brillant
politician. The number of votes I cast is simply a reflection of how
firmly I believe in his policies.Black Adder III, Episode 1
Introduction
So, Arakoon 1.6.0 is out, a release so huge we just had to leap minor increments! This blog post will walk you through the various changes between the 1.4 series and the 1.6 series. We’ll explain why it’s faster, and give an indication by how much.
Harvesting the client queue
Arakoon is a multipaxos implementation. This means that arakoon nodes try to gain consensus on what the next transaction is that needs to be applied.
The thing they gain consensus on is called a paxos value. In all Arakoons (≤ 1.4) there was a 1:1 mapping between a client update and a paxos value. This all works fine in LAN, but when the latency between nodes becomes a lot higher, like in a WAN setup, performance becomes abysmally slow. But some of your users are using Arakoon in a multi-datacenter setup. The solution chosen to address this was to have a n:1 mapping between client updates and paxos values. What Arakoon 1.6 does when the time has come to decide on the next paxos value, is to take all (or up to a limit) of the queued updates, and stuff them in the next paxos value. For multiple clients and small updates this means an almost linear speedup in a WAN setting.
Batched Local store
Ops: We have seen a corrupt tokyo cabinet, and we can reproduce it!
Dev: Great! What’s the scenario?
Ops: It’s really easy. Just apply the following 218 million transactions to an empty store.
Dev: Err, euhm, thx ?!
When you want to fix a problem, having a 100% reproducible scenario is a blessing, but not if it takes forever. Fortunately, during the root cause analysis of such a problem, you have time to think, and thus the insight came: We can speed this up a lot! Having to maintain a transaction log is a mixed blessing, but one of the advantages is that you don’t have to apply all updates to the database immediately. So we added an in-memory caching layer that sits before Tokyo Cabinet. All updates happen there and from time to time, we push the updates through in a big batch. We can afford ourselves to do this as we already ensured durability via the transaction logs.
Logging improvements
Since the early days, Arakoon has a crash logger. When an Arakoon node dies unexpectly, it dumps its last 1000 or so log statements in a separate file. This is a really useful feature, and allowed us to determine the underlying cause of death in many occasions. Unfortunately, it was also a very expensive feature: We were always generating debug level log statements, even if the configuration stated ‘info’. Logging is expensive, even if you throw the log statements away. There is a big cost in the generation of all these strings via fancy Printf templates. So we decided to use a syntax extension that allows us to create these strings more lazily.
So now we have the best of both worlds: when all goes well, we don’t need debug statements and we never pay the cost of constructing the string, but when things turn sour,
we can still go back and create our crash log.
Numbers please
We’re not going to show full fledged benchmarks, just a simple microbenchmark to show a trend. The experiment is to let a client (or more clients) do a million sets to an Arakoon cluster and see how long it takes. All experiments were performed on a simple Intel Core i7 with a HDD.
1e6 sets/client | Arakoon 1.4.3 | Arakoon 1.6.0 |
1 client | 294s | 196s |
4 // clients | 900s | 625s |
8 // clients | 2180s | 1150s |
How to read the table? 1 client does 1e6 sets on Arakoon 1.6.0 and finishes after 196s. 8 clients do 1e6 sets in parallel on Arakoon 1.6.0 and the last client finishes after 1150s. That looks like a rather nice improvement.
The next table is for a cluster of 3 nodes, all running on the same machine and thus sharing the same HDD
1e6 sets/c | Arakoon 1.4.3 | Arakoon 1.6.0 |
1 client | 1090s | 676s |
4 // clients | 4005s | 2470s |
8 // clients | 8734s | 4700s |
A last table shows the effect of latency. We add 50ms latency on everything that goes over the loopback interface (via netem). Again, this is a setup with 3 nodes, all on the same host, sharing a hdd (although this will not matter too much).
1e5 sets/c | Arakoon 1.4.3 | Arakoon 1.6.0 |
8 // clients | 86400 | 20600 |
Note that this benchmark only goes to 1e5 (otherwise, it would take forever) You can immediately see the dramatic effect of latency on a distributed system. Arakoon 1.6.0 does a better job at fighting latency than Arakoon 1.4.3 (the difference in performance will increase with the number of clients).
Other notable improvements
Tokyo Cabinet patched
The whole data corruption annecdote eventually lead to a 32 bit overflow issue in tokyo cabinet. With the help of some friends, we fixed it and made our changes available here in this tokyo cabinet git repo. We really don’t want to expose anyone to database corruption and decided to backport this to the 1.4 series too.
Log sections
We started to use log sections, which allows you to configure a node such a way that for example the messaging layer is in debug, while everything else stays on info level. Which allows you to put the focus on something you’re examining while not having to wade through zillions of log messages. Handy for ops and devs alike.
Client API additions
We added a 2 small but useful methods to the client API. First there’s AssertExists of key that allows you to shortcut a sequence if a key is not there. Second, there’s multi_get_option: key list -> value option list Lwt.t which allows you to fetch a series of values, even if you’re not sure all of them exist in the database.
Always sync
Some of our users have fancy SSDs for the tlogs and wanted to be able to fsync after every write of a tlog entry. We accomodated them.
Full details
You can find the full list of improvements for Arakoon 1.6 in our bug tracker
Conclusion
It’s stable, way faster than it used to be, and it’s free.
You can get it from our githup repo
What are you waiting for?
Caulking your distributed algorithm implementation
Posted: October 25, 2012 Filed under: algorithms, OCaml | Tags: arakoon, distributed systems, graphviz, OCaml 10 CommentsMe: Ok, all unit tests succeed.
All system tests succeed.
All acceptance tests succeed.
Release!
(some time later)
NN: We have a problem with your distributed database.
Me: Ok, what’s the matter?
NN: Once in a while the cluster seems to get stuck.
Me: Stuck how?
NN: It seems to be unable to elect a master.
Me: What did you do to get there?
NN: We don’t know, but we have the logs. Here they are. Happy hunting.
Me: $#%#&*!
(some time later)
Me: Aha, that’s what went wrong!
Introduction
The Arakoon team has spent a big part of the last two years on variations of this theme. Some of the problems turned out to configuration problems (sometimes even of other clusters), but there definitely were situations where the issue was a caused by an implementation error. As it turns out, they all have the same cause: not all possible scenarios were covered. So, is it possible to escape this vicious circle?
We already learned some things (see this previous blog post), so we use asynchronuous message passing and removed all IO from our implementation. So what’s left?
Well we have a set of distributed objects with well defined states and a set of messages that can be exchanged between them. Can’t we just generate all possible states of the distributed system and all possible messages and check how the relationships between them? This is non-trivial as the number of possible states is infinite and so is the number of possible messages. On the other hand, most state combinations are bad and inconsistent, and most messages are irrelevant… There might be something we can do.
Growing a state space-ish diagram
What if we start from a consistent starting state for all objects (let’s call this the system state), generate all relevant messages that can be exchanged from that state and apply them in all possible orders. The system states that can be reached in this way from the starting state should all be consistent. If we find a state that’s not consistent, we stop. For consistent system states, we can iterate. What about inconsistent states? Well, clearly this means our algorithm is capable of doing bad things. We should check the scenario that produced this and fix it, and iterate again. Is this doable? Well, maybe… and what about simulating dropped messages?
Humble beginnings
Let’s start small. What about growing a Basic Paxos implementation for a system of three nodes? Modelling the messages should not be too difficult:
type 'v kind = | Prepare | Promise of 'v option | Accept of 'v | Accepted type 'v message = {n:n; s:id; t:id; k:'v kind}
Each message is associated with a paxos round n, has a source s and a target t and has semantics described by its kind k. Finally there’s some value type (‘v) for the things the system should try to find consensus on. (You can find the code on Github)
Modelling the agents is a bit more work:
type 'v agent_state = | SHalted | SIdle | SRequest of ('v proposal) | SPromised of 'v option | SLead of ('v * int) (* value, outstanding acks *) type 'v agent = { id : id; pop : id list; _n : n; state : 'v agent_state; store : (n * 'v) list; }
An agent has an id, knows the other agents (pop), has a store and a current paxos round n. The interesting part is the inner state representing the role it’s playing. It can be halted or idle, requesting to become a leader for a proposal or leading an update.
Now we have messages and agents, we can model the state of the system.
type 'v network = 'v message list type 'v agents = 'v agent list type 'v state = { net: 'v network; ags: 'v agents}
The system state is merely a collection of agents (ags) and a network (net) representing the messages that can be delivered to the agents.
How can the state of the system change? First of all, the delivery of a message most likely will have an impact. We might add other moves later.
type 'v move = | DeliverMsg of 'v message
Generating all moves is now easy: for every message in the network, there is a move that delivers it, and since we want to stop in bad states, we don’t generate messages there:
let generate_moves state = if is_bad state then [] else let deliver = List.map (fun x -> DeliverMsg x) state.net in ...
How about executing a move? There is some administration to do there.
We have a move to execute, a current state, a path that was followed to arrive at that state, and a set of observed states. If the move is the delivery of a message, we find the target agent and let him handle the message. This will change the agent’s state and produce some messages (extra).
let execute_move move state path observed = let deliver m ag = let agent = find_agent ag m.t in let agent', extra = A.handle_message agent m in let ag' = replace_agent agent' ag in ag', extra in let state' = match move with | DeliverMsg m -> let net' = remove m state.net in let ags',extra = deliver m state.ags in { net = net' @ extra; ags = ags'}
Executing a move will cause a new system state. We will record observing the transition from the old state to the new by this move, and create the new path.
let transition = (state_label state, move, state_label state') in TSet.add transition observed in state' , (move,state) :: path , observed'
The whole simulator is a few functions away:
let rec check_all level state path observed = if not (is_consistent state) then raise (Bad (path,state)); if level = limit then observed else let rec loop observed = function | [] -> observed | move :: rest -> let state', path', observed' = execute_move move state path observed in let observed'' = check_all (level + 1) state' path' observed' in loop observed'' rest in let moves = generate_moves state in loop observed moves let run state0 = check_all 0 state0 [] TSet.empty
Basically we start from an initial state, go down the tree of possible moves, execute all these and accumulate observed transitions.
Generating a diagram is trivial with Graphviz. Just iterate over the observed transitions. (not shown here, see on Github for details )
The simulation
We create 3 agents, let the first one start on a value, run our simulator from the starting state, and dottify the observed transitions.
let main () = let ids = List.map id [1;2;3] in let a1,a1_out = start (make_agent (Id 1) ids) "x" in let a2,a2_out = (make_agent (Id 2) ids),[] in let a3,a3_out = (make_agent (Id 3) ids),[] in let world = [a1;a2;a3] in let state0 = {net = a1_out @ a2_out @ a3_out; ags = world} in try let observed = M.run state0 in M.dottify state0 observed with (M.Bad (path, state)) -> Printf.eprintf "bad path:\n"; M.dump_path path; Printf.eprintf "%s\n" (state_label state)
Mark0
Let’s try this on a brain-dead simple implementation of an agent.
One that goes to the halted state as soon as it receives a message, while sending out no message at all.
module Mark0 = (struct let handle_message agent message = halt agent, [] end : ALGO)
What do we see here? First, there is a labelling scheme for states: R1I0I0 means the first agent has n=1 and is in a Requesting state, while the second and third agents ar in Idle state with n=0.
After the delivery of the {Prepare;1->2;n=1} message, a prepare from agent 1 to agent 2, the second agent halts. Likewise for the other prepare message. This looks ok, so let’s move on.
Mark1
Let’s build an agent implementation that covers the happy path.
module Mark1 = (struct let prepare_when_idle source n agent= let an = agent._n in if n > an then let pv = None in let agent' = {agent with state = SPromised pv; _n = n;} in let msg = {n = n;s = agent.id;t = source; k = Promise pv } in let out = [msg] in agent', out else halt agent,[] let promise_for_request (source:id) (mn:n) vo (proposal:'v proposal) agent = let pv,pballot = proposal in if mn = agent._n then let pballot' = Ballot.register_vote pballot source vo in if Ballot.have_majority pballot' then let value = Ballot.pick_value pballot' in let outstanding_acks = Ballot.quorum pballot' -1 in let me = agent.id in let targets = others agent in let make_msg t = {n = mn; s = me; t ; k = Accept value} in let broadcast = List.map make_msg targets in let agent' = { agent with store = (mn,value) :: agent.store; state = SLead (value, outstanding_acks); } in agent', broadcast else agent, [] else halt agent, [] let handle_accept m v agent = match agent.state with | SPromised vo when agent._n = m.n -> let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in agent', out | _ -> halt agent, [] let handle_accepted m agent = match agent.state with | SLead (v,out) when agent._n = m.n -> let out' = out -1 in let state' = if out' = 0 then SIdle else SLead(v,out') in {agent with state = state'},[] | _ -> halt agent, [] let handle_message agent m = match m.k with | Prepare when agent.state = SIdle -> prepare_when_idle m.s m.n agent | Promise vo -> begin match agent.state with | SRequest p -> promise_for_request m.s m.n vo p agent | _ -> halt agent, [] end | Accept v -> handle_accept m v agent | Accepted -> handle_accepted m agent | _ -> halt agent,[] end : ALGO)
What does this do? (click on it to see the full picture)
The good news is that there are quite a number of paths from the initial state I0I0I0 that reach our happy state I1I1I1, but there are also a lot of scenarios that end up in bad states.
Let’s look at one in detail.
R1I0I0:{Prepare;1->3;n=1} ---> R1I0P1:{Promise;3->1;n=1} ---> L1I0P1:{Accept; 1->2;n=1} ---> L1H0P1
What happened here? A Prepare message goes from agent 1 to agent 3. That agent sends a Promise back.
This causes agent 1 to become a leader and broadcast Accept messages. One of these reaches agent 1, which is clueless as it did not receive a Prepare message first. Agent 1 therefore halts.
The diagram allows us to understand scenarios that lead to bad states, and to modify the algorithm accordingly. This process of finding holes in your algorithm,patching them and iterating is something which I call caulking in absence of a better word. In this particular case, an agent that is Idle can receive an Accept for the next n and should be able to move to the Idle state at the next n.
What about dropped messages?
Earlier, I did not answer the question about the simulation of dropped messages. The above scenario should make clear that we are actually, in luck. There is no difference between that scenario and a scenario where a Prepare from agent 1 and agent 2 was dropped. In general, there is no difference between dropping a message and delaying it until it is no longer relevant. This means there is no need for us to simulate them at all!
Mark2
Let’s caulk Mark1. Looking at the diagram, not a lot of things need to be fixed. Here’s a list of messages that go awry.
- Accept;n when agent is Idle at pred n
- Accepted;n when agent is already Idle at n
- Promise;n when agent is already Leading at n
- Promise;n when agent is already Idle at n
Ok, adapting the code is easy:
module Mark2 = (struct let prepare_when_idle source n agent= let an = agent._n in if n > an then let pv = None in let agent' = {agent with state = SPromised pv; _n = n;} in let msg = {n = n;s = agent.id;t = source; k = Promise pv } in let out = [msg] in agent', out else halt agent,[] let promise_for_request (source:id) (mn:n) vo (proposal:'v proposal) agent = let pv,pballot = proposal in if mn = agent._n then let pballot' = Ballot.register_vote pballot source vo in if Ballot.have_majority pballot' then let value = Ballot.pick_value pballot' in let outstanding_acks = Ballot.quorum pballot' -1 in let me = agent.id in let targets = others agent in let make_msg t = {n = mn; s = me; t ; k = Accept value} in let broadcast = List.map make_msg targets in let agent' = { agent with store = (mn,value) :: agent.store; state = SLead (value, outstanding_acks); } in agent', broadcast else agent, [] else halt agent, [] let handle_accept m v agent = let _accept m = let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in agent', out in match agent.state with | SPromised vo when agent._n = m.n -> _accept m | SIdle when (next agent._n) = m.n -> _accept m | _ -> halt agent, [] let handle_accepted m agent = match agent.state with | SLead (v,out) when agent._n = m.n -> let out' = out -1 in let state' = if out' = 0 then SIdle else SLead(v,out') in {agent with state = state'},[] | SIdle when agent._n = m.n -> agent,[] | _ -> halt agent, [] let handle_message agent m = match m.k with | Prepare when agent.state = SIdle -> prepare_when_idle m.s m.n agent | Promise vo -> begin match agent.state with | SRequest p -> promise_for_request m.s m.n vo p agent | SLead(v,out) when agent._n = m.n -> agent, [] | SIdle when agent._n = m.n -> agent, [] | _ -> halt agent, [] end | Accept v -> handle_accept m v agent | Accepted -> handle_accepted m agent | _ -> halt agent,[] end : ALGO)
Isn’t it nice that fixing the holes in our algorithm actually makes the diagram smaller? Since we don’t end up in bad states anymore, there are way less transitions. It’s also aesthetically pleasing graphviz shows all arrows from left to right, meaning there are no transitions that actually increase the distance between the current state and the state we’re aiming for.
What about agents that are wiped clean?
This kind of calamity is not too difficult to simulate. Basically it’s a move that puts the agent back in its starting state. Let’s add the possibility that one of the agents is wiped.
let generate_moves state = if is_bad state then [] else let deliver = List.map (fun x -> DeliverMsg x) state.net in let id3 = Id 3 in let agent = find_agent state.ags id3 in let wipe = if is_halted agent then [] else [Wipe id3] in deliver @ wipe
Let’s try that…
./paxos.byte > mark3.dot bad path: 1 ---(1: Prepare) ---> 3 3 ---(1: Promise) ---> 1 1 ---(1: Accept) ---> 2 1 ---(1: Accept) ---> 3 Wipe 3 L1I0I0
Auch. There actually is something wrong here. As it turns out, there is a bug in the Mark2 module.
It’s this fragment that’s wrong:
let handle_accept m v agent = let _accept m = let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in agent', out in match agent.state with | SPromised vo when agent._n = m.n -> _accept m | SIdle when (next agent._n) = m.n -> _accept m | _ -> halt agent, []
Handling an Accept when the agent is in the Idle state should also set the n correctly (the one of the message). Let’s fix that and try again.
Here it is:
Again, we’re confronted with lots of bad states but we know how to fix that. What are the scenarios that bring us in bad states? As it turns out, these are all caused by old Prepare messages. Adding that and generating the diagram again yields:
Indeed, wiping an agent moves to a state somewhere to the left of the current state, which matches the idea of being further away from our goal.
Are we there yet?
So far, we only addressed cases where there is only 1 agent in a requesting state. So what would happen if there are two agents requesting something at the same time?
Happy caulking!
Closing Remarks
Arriving at a correct implementation of an algorithm is difficult, but even more so in the case of distributed systems. This blog post shows a strategy you can apply in caulking your own implementations. As stated before, making your implementation pure helps you a lot.
Have fun,
Romain.