Mike Finn @ Flickr
This is just a "note to self" or cheatsheet...
I'm working on the new distributed consensus algorithm about to appear in Cassandra 5.0. I see that I frequently lose momentum when I have to resort to the formal definition of the algorithm. Because I was never good at reading mathematical notation with a lot of greek letters...
So, let's translate the greek letters to English words:
tau = a transaction = trx or trx1
gamma = a concurrent, typically conflicting, transaction = trx2, trx3, ...
The following Latin letters will also be spelled out as full words:
C = Coordinator
p = process = node = replica
The following Latin letters are so common, we can keep using them:
t0 = Initial transaction id / timestamp for the transaction
t0,trx2 = The above for another transaction, trx2
T = final transaction id / timestamp. For transactions with no conflicts T=t0 . The way to resolve conflicts is to set T > all T conflicting_trx > t0 .
t = omitted, it's same as T
Here we go:
Algorithm 1: Consensus protocol
- t0 = (now, 0, Coordinator node id)
- Send PreAccept( trx1, t0 ) to all nodes that hold primary keys (aka rows) that are part of trx1
Receive PreAccept( trx1, t0 ) on such a node p
- if t0,trx1 > T for all other conflicting trx2, trx3... then
- Ttrx1 = t0,trx1 (no conflicts with concurrent trx, you are good to go using the timestamp initially chosen)
- else
- Tmax = max T seen in line 3
- Ttrx1 = ( Tmax[0], Tmax[1] + 1, node id for this node/shard )
- end if
- omitted
- PreAccepted[trx1] = true
- reply PreAcceptOK( T trx1, Dependencies[trx1] = {trx2, trx3... that intersect with the keys (rows) of trx1 AND t0,trxN < t0,trx1}
Receive PreAcceptOk( t trx1, dependencies ) from each node p that owns keys touched by trx1
- Dependencies(trx1) = UNION(dependencies from each p on line 11)
- if a Quorum of nodes returned PreAcceptOk() with Ttrx1 == t0,trx1 then
- send Commit(trx1, Ttrx1=t0, t0, Dependencies[trx1] ) to all nodes p who participate in trx1.
- Go to Execution Protocol (Algorithm 2)
- else
- Ttrx1 = max( T trx1 from each node p, line 11)
- send Accept( trx1, t0, Ttrx1, Dependencies[trx1] to all nodes p that participate in trx1.
- end if
Receive Accept( trx1, t0, Ttrx1, Dependencies[trx1] ) to all nodes p that participate in trx1.
- T trx1 = max( T trx1 from Accept results above )
- Accepted[trx1]= true
- Reply AcceptOK ( Dependencies[trx1] = { trx2, trx3... that intersect with the keys (rows) of trx1 AND t0,trxN < Ttrx1} } )
Receive AcceptOK ( Dependencies[trx1] ) on coordinator node, from a Quorum of trx1 participants
- Dependencies[trx1] = UNION( Dependencies from above line)
- send Commit( trx1, t0,trx1, Ttrx1, Dependencies[trx1] ) to all nodes p that participated in trx1.
- go to Execution protocol
About quorums
The slow path commits use a good old majority quorum.
Example: In a 9 node cluster, 5 nodes need to acknowledge a commit in order to succesfully commit it.
The fast path offers more variety in options. The thinking here is that the first step in achieving good performance with Accord is anyway to design your system such that the fast path will be used a lot, and the slow path as little as possible. So from there it makes sense that all efforts to shave off another millisecond or two have been put into the fast path.
A good starting point to understand the fast path is to realize that you can always choose a configuration that makes the system to return to a simple majority based decision. Hence...
Example: In a 9 node cluster, at least 5 nodes must agree that a fast path decision is possible.
And now... For the optimizations:
- In the scheme presented so far, you would expect the fast path to work for any transaction where there are no conflict/overlap of keys in the two simultaneous transactions. But in a geographically distributed cluster, each node would prefer transactions that originated near itself. If we have trx1 coming from DC1, and trx2 from DC2, then the nodes in DC1 will see trx1 way before trx2 arrives. At which point they will tell trx2 that it is conflicting with another transaction that is already in flight. BUT... in DC2 the exact same thing happens the other way. As a result, neither transaction can choose the fast path.
The re-order buffer is a simple fix to the above: Transactions are assigned a timestamp that is slightly in the future, and each node will consider that time as the time the transaction "arrives" at the node. As a result, all transactions "arrive" at the same time on all nodes, and the transaction (trx1) can be committed in the fast path, because all other in flight transactions are clearly either before or after trx1. - Borrowing a page from Flexible Paxos, Accord also supports using smaller quorums than the typical 5 out of 9 majority. This comes in two forms:
- First, we can define an "electorate", that is a subset of all nodes in the cluster. The nodes not in this subset are basically ignored for the purposes of finding a quorum for the fast path phase. Those nodes can fail or work fine, it doesn't really matter. They are like followers in a leader based system (aka slaves in master-slave replication)
The reason you might want to do this is if you have nodes on multiple continents, including slow ones like Australia, you can omit those from the set of nodes you need to wait for to complete the commit.Example: A 9 node cluster has its nodes distributed over 3 data centers: us-west-1, us-west-2, eu-central-1. The client is in us-west-1, and the network latency to each DC is 4 ms, 23 ms and 153 ms, respectively.
The fast path electorate includes only 3 nodes in us-west-1, and 2 in us-west-2. This means you can get a fast commit in 4 ms, and reach all nodes that are in the electorate in 23 ms.
- You can trade off performance against availability/robustness, as long as you satisfy:
Nodes in electorate >= 2 x Nodes required for quorum + max failed nodes +1
- Example: For a 9 node cluster, the fast path electorate is 5 nodes, and the client only needs to get a response from 3 nodes. This could be the 3 nodes in us-west-1, so the RTT for a commit is now down to 4 ms! The trade off is that as soon as one node fails, or even just restarts for maintenance, fast path commits are no longer available. (f=0!) However, note that if fast path commits are unavailable, the algorithm simply falls back to the slow path quorum, which is guaranteed to succeed.
-
Example: For a 9 node cluster, the fast path electorate is 5 nodes, and the client needs 4 responses for a fast path quorum. The latency to commit is now back to 23 ms, but the system can now tolerate one node failing. (...without any impact on availability of fast path. In total the system can of course still tolerate 4 nodes (out of 9) failing.
- First, we can define an "electorate", that is a subset of all nodes in the cluster. The nodes not in this subset are basically ignored for the purposes of finding a quorum for the fast path phase. Those nodes can fail or work fine, it doesn't really matter. They are like followers in a leader based system (aka slaves in master-slave replication)
- Reconfiguring Electorates (Section 5)
Flexible Paxos is often presented as a trade off where the user has to choose a static system configuration that balances low latency with some fault tolerance. Typically by splitting the difference: In a 9 node cluster, choose |E| = 7, and require 5 nodes for quorum, leaving 2 nodes worth of fault tolerance.
The Accord paper introduces a different mindset: Since Accord in any case needs to support re-configuring the cluster topology, it will also support easy reconfiguration of the flexible quorum. This allows the Accord user to eat the cake and keep it too: It's feasible to set E and F to the smallest possible, and when some nodes eventually fail, one can then simply reconfigure E and F to values that will allow to continue benefiting from the fast path.
A table that summarizes possible combinations of E, F and f:
This talk by Benedict explains in great detail - and with pictures! - how all of this works. By the way, thanks Benedict for also helping me understand enough to write this post!
Algorithm 2: Execution protocol
By the time we get this far, the remaining work is straightforward. All transactions that enter this stage are already known to be committed, they have a transaction id which defines an ordering between them, they are guaranteed to not conflict with each other (as long as they are executed in that order)... So what remains is basically to apply the queue of transactions - simple as that.
Although the Accord white paper doesn't reference it, this high level structure is actually similar to Galera Replication. Galera is the de facto HA solution in the MySQL world since a decade ago, and has been my favorite among distributed consensus algorithms. So this is a promising start at least!
On Coordinator:
- For each partition (aka shard) p:
- Collect the dependencies relevant to partition p into Dependencies[trx1p]
- Execute all reads of the transaction first: Read(trx1, Ttrx1, Dependencies[trx1p])
- end for
- Not sure why the Accord white paper places "receive Commit()" here. Maybe to make the point that the coordinator can proceed to the execution protocol without waiting for other nodes to acknowledge the commit. What is always fun with distributed consensus algorithms is to try to spot the "point of no return". What is the point in the protocol after which a recovery protocol will treat the transaction as committed rather than aborting it? It is way earlier than line 30. Can you spot it?
On a partition p that receives the above Read():
- Wait for all Dependencies[trx1p] to be committed first
- Then wait for all dependencies for which TtrxN < Ttrx1 to be applied. (Note that we only ever cared about conflicting transactions whose timestamp is smaller. But we only know the timestamps after the commit, in the previous row, has happened.)
- Execute the Read(trx1, Ttrx1, Dependencies[trx1p]) from line 28
- Reply to coordinator with ReadOK(read results)
On coordinator, receive ReadOK() from each shard:
- Unclear to me what this row does? I always thought it's just a summary of the next two lines, but maybe not?
- Apply writes, send Apply(trx1, Ttrx1,Dependencies[trx1p], read results) to each partition. Note that syntax for conditional logic exists, so writes may or may not get executed depending on the read results.
- Send read results to client. (Note that even if writes have not been applied on the other nodes yet, they are in fact committed. Hence, this reply implicitly also tells the client that the transaction has completed.)
On a partition p, receive above Apply():
- Wait for all Dependencies[trx1p] to be committed first.
(Yes, this is redundant with the same wait in line 31. But note that there are several reasons why for some particular partition might still need this:a transaction might be write-only,
or at least some transactions would not have any reads on this particular partition
...especially since Accord allows reads to execute on only one partition, so even in the best case, only a third of partitions need to execute #31. - Then wait for all dependencies for which TtrxN < Ttrx1 to be applied.
- apply(writes,Ttrx1)
- Ttrx1 is now applied!
Algorithm 3: Recovery protocol
The recovery protocol introduces the ballot. This is essentially a version or retry-attempt of the same transaction. It is used to override the previous coordinator(s), which essentially have timed out if we enter the recovery protocol. The recovery coordinator overrides the original transaction coordinator by picking a higher ballot number.
The ballot number is therefore actually considered in all the previous algorithms as well, it's just omitted for brevity.
The definition of a weak failure detector:
Weak completeness: Eventually every process that has failed is per-
manently suspected by some correct process. Note that the main distinction here lies in which correct processes detect
a failure.
For example, a node that is part of trx1, may set a timer based on maximum transaction duration. When such a timer fires, it may then assume that the coordinator for that transaction has disappeared or is stuck. To ensure that all transactions are committed or aborted in due time, it can then take over the transaction by using a higher ballot number than what is already observed.
On a Recovery Coordinator:
- Set b = "higher than what is already observed in the transaction" (typically >0)
- send Recover(b, trx1, t0) to all partitions part of trx1
Receive Recover(...) on partition p:
- if b <= MaxBallottrx1
- reply # TODO: verify I'm interpreting correctly wrt MaxBallottau vs btau No idea what the second one is
- else
- MaxBallottrx1 = b
- Accepts = Accepted transactions that conflict on one or more keys in trx1, and don't have trx1 as dependency
- Commits = Same but committed state
- Wait = Subset of Accepts, where t0,trxN < t0,trx1AND ttrxN > t0,trx1
- Superseding = Accepts where t0,trxN < t0,trx1 UNION Commits where ttrxN > t0,trx1
- if PreAcceptedtrx1 then
- run Consensus protocol lines 3 to 10
- end if
- if NOT Accepted[trx1] AND NOT Committed[trx1] AND NOT Applied[trx1]
- Dependencies[trx1] = t0,trxN < t0,trx1 and trxN conflicts with trx1
- end if
- reply RecoverOK( * trx1, Superseding, Wait )
- end if
Receive NACK() on the Recovery Coordinator:
- yield to the competing coordinator
Receive RecoverOK( * trx1, Superseding, Wait ) from all nodes (partitions) that participate in trx1
- if any of the partitions have advanced to Applied status, then
- send response(result) to the client
- send Apply ( trx1, t0, t for the partition that is Applied, trx1 dependencies from the partition that was applied, result )
- else if any of the partitions have advanced to the Committed[trx1] status, then:
- send Commit( trx1, t0, t for the partition, trx1 dependencies from the partition, result )
- go to Execution protocol
- else if any of the partitions have advanced to Accepted[trx1] status
- select partition with highest (accepted) ballot nr
- t = t from the partition that was in Accepted[trx1]
dependencies = dependencies from the partition - else
- t = t0
Dependencies[trx1] = Union of dependencies on all partitions - if more than |E| - |Fi| partitions have ttrx1 > t0,trx1 then
- t = max(t returned from each partition)
- else if any partition has Superseding transactions for trx1, then
- t = max(t returned from each partition)
- else if any partition has transactions in Wait status, then
- await all transactions trxN that are in Wait on any partition to become Committed
- restart Recovery Protocol
- end if
- go to Consensus Protocol
- end if
- Log in to post comments
- 774 views
Interesting approach. I have…
Interesting approach.
I have been. away from This a for a little so I have some catching up.
I glad to see that Galera is making its way into to Casandra space.
I have some catching up to do as far Casandra and maybe have a more meaningful response.
good work.
Celest