【笔记】Raft 与 MIT 6.824

/ 0评 / 0

Raft基本概念

Server总数为单数;

每个Server有3种状态:Leader(领袖)、Candidate(候选人)、Follower(跟随者)

Leader通过发送心跳来保证自己的地位,一旦Leader崩溃,Follower在检测不到心跳后就会开启选举试图成为Leader;

Raft将时间划分为多个Term(任期),每次选举将开始新的Term,选举实现投票制第一个Term为1;

Raft在所有Server保存Log(日志),Log由LogEntry(日志项)组成,并由Leader同步所有Server的Log(内容相同、顺序一致);

每个LogEntry附带Term和Index,,Index代表LogEntry在Log中的位置,第一个Index为1

当Leader已经将某个LogEntry传输到大多数(大于总数的一半,下同)Server上,则该LogEntry视为已提交(Commited)

当Server将某个已提交的LogEntry中的内容执行并持久化,则该LogEntry视为已应用(Applied)

每一个Follower在任何一个Term中最多只能投一票,按先到先得原则,Candidate总是为自己投票;

Follower通过开启选举变为Candidate,Candidate在获取大多数投票后变为Leader;

每一次选举要么产生唯一的Leader,要么不产生Leader;

Follower不主动发送RPC,仅对接收到的RPC进行响应;

每个Server保存一个CurrentTerm,一旦观察到其他Server具有更大的Term就更新CurrentTerm;

如果Leader和Candidate观察到其他Server具有更大的Term,它更新CurrentTerm并变回Follower;

所有Server拒绝任何Term < CurrentTerm的RPC;


Raft的安全性由以下性质保证:

Election Safety:在任何一个Term中最多只有一个Leader。

在某个Term中所有Candidate可能都无法获得大多数投票,因此该Term将不产生Leader。

Leader Only-Append:Leader只追加LogEntry,而不覆盖或删除LogEntry。

Candidate和Follower根据需要可以覆盖或删除LogEntry。

Log Matching:如果两个LogEntry的Term和Index相同它们就是同个LogEntry。

如果在不同的Log中的两个LogEntry的Term和Index相同,那么这两个LogEntry保存相同的内容。

如果在不同的Log中的两个LogEntry的Term和Index相同,那么这两个Log在该LogEntry之前的所有LogEntry完全相同。

Leader Completeness:如果某个LogEntry被某个Leader提交,那么在未来Term的Leader必定拥有该LogEntry。

未来Term的Leader必定拥有该LogEntry,但不一定已提交该LogEntry。

State Machine Safety:如果某个Server应用了某个LogEntry,那么其他Server不可能在相同的Index上应用不同的LogEntry。

这保证了所有Server最终将以相同的顺序应用相同的LogEntry。


Server之间通过RPC进行沟通,Raft只需要两种类型的RPC

RequestVoteRPC:由Candidate向其他所有Server发送,用于选举拉票。

请求参数:termCandidate的CurrentTerm,candidateId:Candidate的ID,lastLogIndex:Candidate最后一个LogEntry的Index,lastLogTerm:Candidate最后一个LogEntry的Term

响应参数:term:响应者的CurrentTerm,voteGranted:响应者是否为其投票

AppendEntriesRPC:由Leader向其他所有Server发送,用于心跳和LogEntry传输。

请求参数:term:Leader的CurrentTerm,leaderId:Leader的ID,prevLogIndex:所传输的LogEntry的前一个Index,prevLogTerm:所传输的LogTerm的前一个Term,entries:所传输的LogEntry,LeaderCommit:Leader所提交的最后一个LogEntry的Index

响应参数:term:响应者的CurrentTerm,success:响应者当前的Log是否匹配prevLogIndex和prevLogTerm


Raft选举

Leader通过不带LogEntry的AppendEntriesRPC发送心跳,当Follower在一段时间(选举超时ElectionTimeout)内未收到心跳后将开启选举,变为Candidate。

一旦Follower变为Candidate,它自增CurrentTerm,并立刻向所有其他Server发送RequestVoteRPC:

一旦Candidate获得超过大多数的选票,它变为Leader,并通过AppendEntriesRPC发送心跳;

一旦Candidate收到来自其他Server的AppendEntriesRPC,并且Term >= CurrentTerm,则Candidate停止选举并变回Follower;

一旦Candidate收到来自其他Server的AppendEntriesRPC,但Term < CurrentTerm,则Candidate拒绝RPC并继续选举;

如果Candidate未能获取超过一半的选票,同时也未能收到来自其他Server的AppendEntriesRPC(且Term >= CurrentTerm),那么它重新计时并在超时后再次开启选举。

即便开启了选举,该Term最终不一定能产生Leader。在一个Term中如果多个Server在短时间内同时变为Candidate,由于Candidate总是为自己投票,因此剩余的票数可能无法超过总数的一半,此时将无法产生Leader,所有Candidate将在超时后重新开启新的一轮选举(因此开启了新的Term),ElectionTimeout在一定范围内随机化,以防止此情况重复发生。

各个角色的行为如下:

Follower

拒绝所有Term < CurrentTerm的RPC;

接收来自Leader的AppendEntriesRPC,每次心跳将重置选举计时,如果Tem > CurrentTerm那么更新CurrentTerm并刷新投票机会;

接收来自Candidate的RequestVoteRPC,按先来先到原则投票,每个Term最多只能投一票;

如果Term > CurrentTerm,那么无论过去的Term是否已投票,都更新CurrentTerm并直接投票;

如果Term == CurrentTerm并且已投票,此时不能立即拒绝投票,因为Follower可能正是为这个Candidate投过票,只是因为网络问题收到了来自它的两个相同的RequestVoteRPC,故Follower需要记录自己过去为谁投票,并以此作出响应;

一旦投票成功,重置选举计时。

一旦ElectionTimeout超时,开启选举,变为Candidate。

Candidate

拒绝所有Term < CurrentTerm的RPC;

向所有其他Server发送RequestVoteRPC,并接收响应,如果Term > CurrentTerm那么停止选举,更新CurrentTerm并变回Follower;

由于网络延迟,接收到响应时CurrentTerm可能已经改变,此时忽略来自过去的Term的所有响应;

一旦CurrentTerm已经改变,说明Candidate已经变回Follower,此时停止选举。

如果RequestVoteRPC的响应票数超过一半,那么变为Leader,并立即通过AppendEntriesRPC发送心跳;

接收来自Leader的AppendEntriesRPC,如果Term >= CurrentTerm那么停止选举,更新CurrentTerm、刷新投票机会并变回Follower;

接收来自其他Candidate的RequestVoteRPC,如果Term > CurrentTerm那么停止选举,更新CurrentTerm变回Follower并直接投票;

接收来自其他Candidate的RequestVoteRPC,但Term == CurrentTerm,此时拒绝投票,并继续选举。

Leader

拒绝所有Term < CurrentTerm的RPC;

向所有其他Server发送AppendEntriesRPC,并接收响应,如果Term > CurrentTerm那么更新CurrentTerm并变回Follower;

由于网络延迟,接收到响应时CurrentTerm可能已经改变,此时忽略来自过去的Term的所有响应;

一旦CurrentTerm已经改变,说明Leader已经变回Follower,此时停止发送AppendEntriesRPC。

接收来自其他Leader的AppendEntriesRPC,如果Term > CurrentTerm那么更新CurrentTerm并变回Follower;

理论上不可能接收到来自其他Leader的AppendEntries且Term == CurrentTerm,这是因为每个Term最多只能有一个Leader。

接收来自Candidate的RequestVoteRPC,如果Term > CurrentTerm那么更新CurrentTerm,变回Follower并立即投票;

接收来自Candidate的RequestVoteRPC,但Term == CurrentTerm,此时拒绝投票,并发送心跳;

在任何一个Term中,由于Candidate需要获得超过一半的选票才能成为Leader,而每个Server在每个Term只能投一票,因此每个Term最多只能有一个Leader,这保证了Election Safety性质


Raft日志传输

Leader通过附带LogEntry的AppendEntriesRPC传输Log,所有的用户请求只能由Leader处理,当Leader接收到新的用户请求时,它生成一个LogEntry并将其加入到自己的Log中,Leader此时还不可以应用该Log(即执行用户请求的操作),Leader必须确保该LogEntry已提交才能应用,应用之后才能向用户返回成功结果,为此Leader需要向其他Server发送Log。

一旦Leader得知大多数Server拥有该LogEntry,Leader就会提交该LogEntry,并在提交后立即应用该Log并向用户返回成功结果。每个Server都会维护一个CommitIndex,用于记录自己最新提交的LogEntry的Index,注意即便Leader提交了某个LogEntry,其他Server在第一时间尚未提交该LogEntry,其他Server必须等到得知Leader已经提交了该LogEntry之后才会提交。Raft保证,Leader一旦提交并应用了某个LogEntry,最终所有Server都将提交并应用该LogEntry。

其它Server如何得知Leader提交了某个LogEntry呢?当Leader向其它Server发送AppendEntriesRPC时,它通过LeaderCommit参数附带自己的CommitIndex,一旦Server接收到RPC,判断出LeaderCommit > CommitIndex,它便可以提交LogEntry并更新CommitIndex。

如果Leader无法将LogEntry发送给大多数Server,那么该LogEntry就无法被提交并执行,用户请求就会失败,同时这些未提交的LogEntry将有可能散布在Server中,即:

R1. 由于LogEntry总是由对应Term的Leader先提交,因此Leader可以确保自己拥有当前Term的所有已提交的LogEntry,并且自己已经提交这些LogEntry;

R2. Leader可以确保已提交的LogEntry被大多数Server拥有,但无法确保已提交的LogEntry被大多数Server提交,也无法确保某个Server拥有已提交的任何某一个LogEntry;

R3. Leader可以确保自己拥有过去的所有Leader已提交的所有LogEntry,但无法确保自己已经提交这些LogEntry;

R4. Leader一旦得知某个LogEntry被大多数Server所拥有,如果LogEntry的Term == CurrentTerm,即该LogEntry是由自己发送的,Leader可以宣布该LogEntry已提交;但如果LogEntry的Term < CurrentTerm,即该LogEntry是由以前的Leader发送的,Leader无法断定该LogEntry已提交,同样所有的其它Server都不能根据LeaderCommit提交Term < CurrentTerm的LogEntry。

上述规则决定了其它Server相比于Leader来说,Log可能缺少了某些LogEntry,也可能多出来了某些LogEntry,因此需要一种机制可以让Leader向其它Server发送LogEntry,同时让这些Server与Leader达成同步。首先为了发送LogEntry,Leader将维护一个NextIndex数组、MatchIndex数组,每个Server将维护LastLogIndexLastLogTerm

LastLogIndex即自己的Log中的最后一个LogEntry的Index;

LastLogTerm即自己的Log中的最后一个LogEntry的Term;

NextIndex数组记录了Leader下一次要向其他Server发送的LogEntry的Index;

MatchIndex数组记录了Leader已知其他Server保存最新的LogEntry的Index;

Leader如果发现某个Server的NextIndex <= LastLogIndex,那么它将向Server发送LogEntry,发送的LogEntry存放在entries请求参数,一次可以发送多个LogEntry,包括从Index == NextIndex的LogEntry开始直到Index == LastLogIndex的LogEntry。一旦得到成功响应,Leader更新NextIndex和MatchIndex。

同时为了让其它Server和自己同步,需要通过AppendEntriesRPC的PrevLogIndexPrevLogTerm参数:

当Leader向其它Server发送AppendEntriesRPC时,它将PrevLogIndex设置为将发送的LogEntry的上一个LogEntry的Index,将PrevLogTerm设置为发送的LogEntry的上一个LogEntry的Term;

当某个Server接收到RPC时,它搜索自己的Log,判断是否存在某个LogEntry的Index == PrevLogIndex 且 Term == PrevLogTerm,如果存在那么删除该LogEntry之后的所有LogEntry再添加接收到的LogEntries,如果不存在说明该Server中间缺少了某些LogEntry,它作出失败的RPC响应并忽略收到的LogEntries;

当Leader接收到RPC响应时,如果成功它更新NextIndex和MatchIndex,如果失败它将回退NextIndex,并在下次向该Server发送更多的LogEntry。

依据这个过程,R1、R2是显然成立的,但R3还缺少条件,R3即是Leader Completeness性质,为了保证该性质,Raft规定了Server向其它Server投票的条件,即若ServerA想向ServerB投票,那么ServerB的Log必须至少和ServerA的Log一样新(up-to-date)。

up-to-date的定义:比较两个Log的最后一个LogEntry,Term大的Log比Term 小的Log新,如果Term相同,那么Index大的Log比Index小的Log新。

一旦考虑上述条件,,那么Leader Completeness性质就可以得到保证,逻辑如下:

假设Term1 < Term2,Term1存在Leader1,Term2存在Leader2,Leader1提交了某个LogEntry,下证Leader2一定拥有该LogEntry;

由于LogEntry已提交,因此Leader1必定已将该LogEntry传输到了大多数Server;

为了成为Leader,Leader2必须在Term2曾获得超过一半Server的选票;

接收到Leader1传输的LogEntry的Server和为Leader2投票的Server至少有一个重复的;

即至少有一个Server既拥有该LogEntry,同时为Leader2投过票;

由Raft投票条件,Leader2也必须拥有该LogEntry。

为了在投票时可以进行比较,RequestVoteRPC将附带参数LastLogIndexLastLogTerm

依据R1和R3,可以保证Leader只会追加LogEntry,而其它Server(Follower或者Candidate)可以删除LogEntry再追加(即覆盖),这即是Leader Only-Append性质

由于每个Term最多只能有一个Leader,而LogEntry最开始一定是由Leader创建的,因此Term和Index均相等的LogEntry一定是同个LogEntry,这便是Log Matching性质。

对于R4,前半句是显然的,Leader自己发送的LogEntry如果传输到了大多数Server上就可以宣布已提交,Leader可以通过MatchIndex数组来判断LogEntry是否已经传输到了大多数Server上,只需要判断MatchIndex是否有大多数元素大于等于LogEntry的Index即可,但这仅限于Leader自己发送的LogEntry,即Term == CurrentTerm的LogEntry。对于Term < CurrentTerm的LogEntry:

如上图,刚开始S1为Leader,发送了Index == 2且Term == 2的LogEntry给S2;随后S1崩溃,S5当选为Leader(S3、S4可为其投票),S5创建了Index == 2且Term == 3的LogEntry;随后S5崩溃,S1重启并再次当选Leader(S2、S3、S4可为其投票),继续发送Index == 2且Term == 2的LogEntry给S3,此时该LogEntry已经被大多数Server拥有,但S1并不能提交它,如果S1提交了该LogEntry且随后S1崩溃,S5重启并再次当选Leader(S2、S3、S4可为其投票),S5向其它Server发送的Index == 2且Term == 3的LogEntry可以覆盖掉这些Server中Index == 2且Term == 2的LogEntry,而这显然是错误的。

同时,对于不是Leader的Server来说,也不能根据LeaderCommit来提交Term < CurrentTerm的LogEntry,在上图中,即使S1没有提交Index == 2且Term == 2的LogEntry,S5当选Leader后创建Index == 2且Term == 3的LogEntry,此时即便其它Server得知CommitIndex == 2也不能提交Index == 2且Term == 2的LogEntry,因为这个LogEntry和Leader的已经不是同一个了。

那么Leader和其它Server应该如何提交Term < CurrentTerm的LogEntry呢?只有一种办法0,即当某一个Term == CurrentTerm的LogEntry已提交时,在此之前的所有LogEntry(包括Term < CurrentTerm的)必定已经提交,这是因为Raft对于LogEntry的提交是累积的。

为什么LogEntry的提交是累积的?Follower和Candidate只能通过LeaderCommit来提交LogEntry,即可以保证任何一个LogEntry总是由Leader第一个提交,而由于Leader Completeness性质,新的Leader必定拥有以前Leader所有已提交的LogEntry,因此新的Leader不会在相同的Index上提交不同的LogEntry,即所有Leader都将延续同一个已提交的LogEntry序列,该序列的长度单调递增。最后,由于所有Server都按Index的顺序提交LogEntry,因此LogEntry的提交一定是累积的。因此不难看出,所有的Server将以相同的顺序应用相同的LogEntry,即State Machine Safety性质成立。

但是即便State Machine Safety性质成立,Raft也只是保证所有的Server将以相同的顺序应用相同的LogEntry,但不能保证不会应用重复的LogEntry,即对于一次用户请求,Raft允许提交并应用相同的两条LogEntry(这里的“相同”指的是从用户角度,即内容相同,但这两条LogEntry不满足Term和Index均相等,因此从Raft的定义来说是不同的两条LogEntry)。如果用户请求的操作不具备幂等性,那么这种情况下将陷入很大的风险,因此Raft推荐由用户为每一个操作定义唯一的标识符,并在业务层面处理重复的LogEntry,以避免相同的业务操作被重复执行。

各角色的行为如下:

Follower

接收来自Leader的AppendEntriesRPC,即使entries参数不为空,也视为接收到心跳,需要重置选举超时;随后:

根据请求参数中的PrevLogIndex和PrevLogTerm判断是否为第一批LogEntry,如果是则直接添加到Log中,作出成功响应并更新LastLogIndex和LastLogTerm;

如果不是第一批LogEntry,需要根据PrevLogIndex和PrevLogTerm判断这些LogEntry是否匹配自己的Log,即搜索当前Log中Index == PrevLogIndex且Term == PrevLogTerm的LogEntry,如果存在则删除其后的所有LogEntry并添加接收到的LogEntry,作出成功响应并更新LastLogIndex和LastlogTerm,否则作出失败响应;

无论成功还是失败,接下来都需要根据请求参数中的LeaderCommit判断是否需要提交LogEntry。注意Follower可能缺少很多Log,因此即使LeaderCommit > CommitIndex最多也只能提交自己拥有的Log,并且由R4中的限制,Follower只能提交Term == CurrentTerm的LogEntry,即Follower只有在Index == Min{LeaderCommit, LastLogIndex}的LogEntry的Term == CurrentTerm的情况下才能提交CommitIndex + 1 <= Index <= Min{LeaderCommit, LastLogIndex}的所有LogEntry。一旦作出提交,立即应用所有提交的LogEntry,并更新CommitIndex。

Candidate

接收来自Leader的AppendEntriesRPC,如果Term >= CurrentTerm将变回Follower(见Raft选举),如果entries参数不为空,执行和Follower一样的行为。

Leader

接收来自其他Leader的AppendEntriesRPC,如果Term > CurrentTerm将变回Follower(见Raft选举),如果entries参数不为空,执行和Follower一样的行为。

向所有其它Server定期发送心跳,即entries参数为空的AppendEntriesRPC,其中LeaderCommit = CommitIndex,并接收响应:

如果接收到响应时已经不是Leader或者Term > CurrentTerm将变回Follower(见Raft选举);

接收到响应后,如果NextIndex <= LastLogIndex,向Server发送附带LogEntry的AppendEntriesRPC。

接收来自其它Server的附带LogEntry(entires参数不为空)的AppendEntriesRPC响应:

如果Term  == CurrentTerm且响应结果为成功,更新MatchIndex和NextIndex,最好的方法是Server在RPC响应附带自己的LastLogIndex,这样Leader可以更新NextIndex = ServerLastLogIndex + 1且MatchIndex = ServerLastLogIndex;随后检查当前Log中Term == CurrentTerm的所有尚未提交的LogEntry,是否可以在MatchIndex中找到大多数元素大于等于其Index,如果可以则提交该LogEntries,并更新CommitIndex。由于提交是累积的,因此只需从最新的LogEntry往前检查,最多提交一个即可。

如果Term == CurrentTerm且响应结果为失败,回退NextIndex,并重新发送附带LogEntry的AppendEntriesRPC;

如果Term < CurrentTerm,说明收到了过时的响应,此时的行为并不影响正确性,可以选择如下两种:

忽略所有过时的响应;

如果响应的结果为成功,依旧更新MatchIndex和NextIndex,但是不检查LogEntry是否需要提交。如果响应的结果为失败,依旧回滚NextIndex,但不重新发送附带LogEntry的AppendEntriesRPC。

上述过程中,Leader如何回滚NextIndex?由于Server的Log可能缺少某些LogEntry也可能多出来某些LogEntry,因此需要找到双方Log的匹配点,并将该点之后的Index作为NextIndex。一种简单的方法是每次都将NextIndex减一,这样迟早可以达到匹配点,但效率很慢。Server可以在响应中附带足够的信息以便让Leader快速判断出匹配点,下面给出一种可行的方法,该方法在响应中增加参数ConflictTermStartIndex

一旦Server发现接收到的LogEntry不匹配,如果PrevLogIndex > LastLogIndex,说明Server仅仅是缺少更多的LogEntry,只需置ConflictTerm = 0且StartIndex = LastLogIndex + 1

如果PrevLogIndex <= LastLogIndex,说明Server拥有多余的LogEntry,Server找到Index == PrevLogIndex的LogEntry并置ConflictTerm = LogEntry.Term,Server还需找到自己Log中该Term的第一个LogEntry,并置StartIndex = TermFirstLogEntry.Index + 1

当Leader接收到响应后,如果ConflictTerm == 0,回滚NextIndex = StartIndex即可,否则搜索Log中是否存在Term == ConflictTerm的LogEntry:

如果不存在,回滚NextIndex = StartIndex;

如果存在且LogEntry.Index < NextIndex,回滚NextIndex = LogEntry.Index;

如果存在且LogEntry.Index >= NextIndex,回滚NextIndex = StartIndex。


Raft恢复

Raft要求所有Server持久化如下数据:

CurrentTerm;

在当前Term中Server是否投票以及投票给谁的信息;

Log(所有LogEntry)。

一旦上述信息发生改变,Server必须在发出RPC响应之前持久化它们,并在崩溃恢复时读取这些信息。

对于其它不需要持久化的信息:

CommitIndex:恢复时初始化为0即可;

LastLogIndex/LastLogTerm:可以从持久化的Log中读取;

NextIndex:在成为Leader时初始化为LastLogIndex + 1;

MatchIndex:在成为Leader时初始化为0;


ZooKeeper

ZooKeeper是一个分布式协调服务,它提供了一组用于分布式同步的原语,这些原语具有一定的通用性,因此可以用它们实现特定的分布式同步需求。

ZooKeeper提供了一个类似文件系统的命名方式和层次结构,Client可以通过API创建、删除、修改某个路径下(如/app1/lock1)的“文件”,ZooKeeper将这个“文件”称为Z-Node,Z-Node之中可以存储元数据,也可以仅仅作为占位符来使用,ZK对Client应用在Z-Node上的操作提供了某种顺序保证。ZK将Z-Node分为如下3个类型:

Regular Z-Node:由Client完全负责其生命周期,包括创建和删除;

Ephemeral Z-Node:由Client创建,一旦ZK认为Client崩溃或者会话结束,ZK自动删除该Z-Node;

Sequential Z-Node:由Client创建,ZK将在Client提供的文件名后添加一个唯一的数字标识符后缀,该后缀反映Z-Node的创建顺序,ZK保证并发情况下不同的Z-Node具有不同的后缀。Sequential Z-Node既可以是一个Regular Z-Node也可以是一个Ephemeral Z-Node。

对于每个Z-Node,ZK会维护其最后一次修改的时间戳和一个表示版本号的计数器。

ZK提供如下的Client API:

create(path, data, flags):在路径path下创建Z-Node,并写入数据data,flags参数用于指示Z-Node的类型,如果创建成功,返回Z-Node的名称;

delete(path, version):如果路径path中的Z-Node的当前版本号与version参数相同,那么删除该Z-Node;

exists(path, watch):判断路径path中的Z-Node是否存在,watch参数用于Client设置监视,下同;

getData(path, watch):获取路径path中的Z-Node中的数据(包括包含时间戳和版本号的元数据);

setData(path, data, version):如果路径path中的Z-Node的当前版本号与version参数相同,那么更新其数据为data;

getChildren(path, watch):获取路径path中的Z-Node的下一层中的所有Z-Node的名称;

sync():等待ZK Server应用所有更新操作。

上述watch参数可以指示ZK监视某个Z-Node,当这个Z-Node发生修改,如被删除、被更新时,ZK会通过事件通知Client,监视是一次性的,一个watch只会通知Client一次。对于使用watch参数监视Z-Node的Client来说,ZK会保证Client在读取到被修改后的Z-Node的新状态之前接收到事件

ZK提供如下一致性保证:

来自所有Client的写请求都是强一致性的,但来自所有Client的读请求不能保证强一致性。

来自单个Client的所有操作(包括读和写请求)都遵循FIFO顺序。

因此对于一个被多个Client同时读写的Z-Node来说,ZK不保证每次Client的读请求都能得到最新的数据,而对于一个只被某个Client同时读写的Z-Node来说,ZK保证每次Client的每次操作都遵循FIFO顺序,读请求可以得到最新的数据。实际上ZK的幕后(称为Zab协议)就是通过改进的Raft实现的,但ZK允许所有Server处理读请求以增强读性能,代价是牺牲读操作的强一致性。ZK的容错恢复能力和Raft是相同的,只要大多数运行ZK的Server仍然存活,系统就能够正常对外服务。

如果你需要全局读操作的强一致性,你可以使用sync函数,sync将等待ZK Server应用所有更新操作,因此紧随sync之后的读操作(如getData)将读取到截止sync调用时的所有最新状态。

Client可以使用ZK提供的API实现自己的同步需求。

配置管理(Configuration Management):通过创建一个保存配置信息的Z-Node来实现配置管理,Client启动时读取Z-Node的信息并设置watch,一旦配置更新,Client将收到事件、读取新的配置,并再次设置watch。如果配置的更新涉及到很多数据,可以设置一个Ready Z-Node,Client读取并等待Ready Z-Node的修改,而只有当所有新的配置信息准备好之后才更新Ready Z-Node。

分组(Group Membership):通过创建一个表示组的Group Z-Node来实现,Client可以在其下级创建Child Z-Node来表示自己隶属于该组,并且Child Z-Node是Ephemeral和Sequential的,一旦Client崩溃或者失联ZK会自动删除其Child Z-Node。通过getChildren函数可以获取某个组的所有成员,如果想要监视组内成员的变化,可以通过watch参数监视Group Z-Node。

锁(Lock):通过创建一个表示锁的Lock Z-Node来实现,Client通过create函数创建该Z-Node并设置为Ephemeral,如果该Z-Node已经存在说明锁正在被占有,Client设置监视并在事件到达后重试,如果该Z-Node不存在且Client创建成功即成功持有锁,Client通过删除Z-Node来释放锁,由于Z-Node是Ephemeral的因此即便Client崩溃Z-Node也会被ZK自动删除。伪代码如下:

if exists("lock-file", watch=true)
     wait for event and retry
else if create("lock-file", data, flags={Ephemeral})
     successfully get lock
else
     retry

这种方式有一个缺点,即每次锁被释放都会有大量的Client试图竞争该锁,最终只有一个Client将成功持有该锁,这称为Herd Effect。可以进行优化:

n = create(“locks/lock-file-”, data, flags={Ephemeral, Sequential})
C = getChildren("locks", false)
if n is lowest znode in C
     successfully get lock
p = znode in C ordered just before n
if exists(p, true) 
     wait for event and retry getChildren

在此方法中,每个试图获取锁的Client均通过Sequential创建一个Lock Z-Node,Z-Node具有最小数字标识符后缀的Client持有该锁,其它Client通过watch等待位于自己前面的Client持有并释放锁,避免了Herd Effect。

需要注意的是:上述通过ZK实现的锁仅仅保证了排他性,但不能保证原子性,即持有锁的Client即便崩溃了它对数据的修改也不会撤销,锁仅仅只是交给下一个Client而已。


CRAQ

CRAQ是基于Chain Replication副本机制进行改进的,全称Chain Replication with Apportioned Queries,可以在提供容错恢复的前提下为read-mostly工作负载提供很高的性能。

Chain Replication使用一种链式副本结构,所有参与的Server通过链式结构进行连接并维护相同的数据,所有写请求都由头结点进行处理,头结点应用写操作,并将写操作向下传递到链的每个结点,每个结点都应用写操作,直到尾结点作出成功响应后,写请求视为已提交。所有读请求都由尾结点进行处理,由于尾结点总是具有最新的状态,因此可以保证该系统的强一致性。

CRAQ基于Chain Replication进行了改进,每个结点允许保存多个版本的数据,并维护版本号,一旦接收到写请求,每个结点应用写操作并添加新版本,直到写操作被提交之前,这个版本都被标记为Dirty,当尾结点接收到写操作后,它应用写操作,并将版本标记为Clean,此时该写请求视为已提交,尾结点通知前面所有结点将版本标记为Clean。一旦某个结点将某个版本标记为Clean,在此之前的所有Clean版本都可以删除,这意味着结点在正常工作的情况下只会存在一个Clean版本,如果结点拥有多于一个版本,那么最新版本肯定是Dirty的。

CRAQ的另一个改进在于,不同于Chain Replication只允许尾结点处理读请求,CRAQ允许所有结点处理读请求,当某个不是尾结点的结点接收到读请求时,它首先判断自己的最新版本是不是Dirty,如果最新版本是Clean的,那么结点直接用当前的最新版本响应该请求即可,如果最新版本是Dirty的话它向尾结点询问该数据的最新提交版本(Clean)并响应。注意尾结点在被询问某数据的最新提交版本时,即便该数据目前存在Dirty版本,也只需要立即返回该数据的Clean版本。因为数据的提交最终是在尾结点发生的,因此这么做并不违反强一致性,如果尾结点被询问时数据存在Dirty版本,说明该版本还未提交,因此不应该用于响应请求。实际上CRAQ提供了多种一致性选择:

强一致性(Strong Consistency):所有读请求都将读取到最近一次写请求的数据。

最终一致性(Eventual Consistency):读请求可能返回过时的数据。

有限度的最终一致性(Eventual Consistency with Maximum-Bounded Inconsistency):读请求可能返回过时的数据,但过时被限制在一定范围(通过版本号限制)。

由于所有结点都可以处理读请求,因此CRAQ可以提供正比于结点数量的读性能,这与Raft这种基于Leader的协议不同。

当某个结点崩溃时,如果是头结点那么第二个结点将成为新的头结点,如果是尾结点那么倒数第二个结点将成为新的尾结点,如果是中间结点那么其两侧的结点相连。需要注意的是Chain Replication本身不具备很强的容错恢复功能,如果仅仅是因为网络问题将这些结点分隔成无法互联的两组,那么可能会出现Split Brain问题,因此CRAQ一般不是单独使用的,而是配合诸如Paxos、Raft、ZooKeeper等协议进行外部协调。


Aurora

Aurora是Amazon公司设计的具有高容错和高性能的分布式关系型数据库系统,其基础是开源的MySQL,Aurora将数据库的存储系统分离出来并单独实现了一套针对数据库用途优化的存储机制,抛弃了传统的存储基础设施的通用性,达到了相当于MySQL的好几倍性能。

Aurora的容错是通过副本实现的,传统的MySQL为了实现容错,需要在各个副本上保存相同的数据并进行同步,同步的机制采用数据页的传输,所要求的网络容量很高:

上图展示了传统MySQL数据库副本的同步过程,数据库实例首先需要在本地生成Log(包括Redo Log和Undo Log)和BinLog,并定期(Checkpoint)将脏页写入到非易失性存储当中,为了防止数据写入存储时崩溃还需采用Double-Write方法,当这些完成之后,将所有这些操作所需的数据传输到副本实例,并在副本实例上合并这些内容,整个事务才算完成。这种机制可以保证两个实例具有相同的状态,因此都可以对外提供服务,为了确保正确性,一般只允许其中一个实例处理写请求,其它实例只服务只读请求。这种传统方法的缺点在于同步的性能不高,事务必须确保在副本中也成功提交才能响应请求,而且同步所需的网络资源也很高,需要传输Log、BinLog、数据页等,如果需要多个副本,同步的代价是很高的。

Aurora抛弃了这种传统的方法,将数据库的存储系统分离出来,并单独实现容错和恢复,Aurura的存储系统采用了基于Quorum的容错机制:

对于需要同步的N个Server,当Client想要写入数据时,它必须确保该写入成功应用到至少W个Server;当Client想要读取数据时,它必须从至少R个Server中读取该数据。

在上述设定中,只要W + R > N且W > N/2即可确保读取和写入的正确性。

例如,对于采用6个Server的Aurora存储系统,写入请求需要至少应用到4个Server,而读取请求需要至少读取3个Server,这样可以保证每一个读取请求都可以从至少1个Server读取到最新的数据,为了识别最新的数据,需要为每一个数据增加版本号。

在Aurora中,数据库实例仅需要通过网络将Redo/Undo Log传输到存储Server中:

如上图,6个存储Server分布在3个AZ中,每个AZ运行2个存储Server,AZ即Availability Zone,不同的AZ分布在不同的数据中心,因此提供更强的容灾能力。在3个数据库实例中,只有Primary Instance能够处理写请求,其余Replica Instance只能处理读请求。这些数据库实例都是基于开源的MySQL进行魔改的,具有与MySQL相同的查询处理、事务、锁、缓存、查询优化等功能,但Log的保存和应用、持久化存储、崩溃恢复都已经分离出来由存储Server负责,数据库实例不需要负责Log的应用和数据持久化。

当实例接收到写请求时,它仅仅将Redo/Undo Log传输到6个存储Server,并等待其中4个存储Server的成功应答,一旦接收到4个存储Server的响应,实例便可以直接响应写请求。存储Server中在非易失性存储中保存数据库数据,并且在易失性内存中缓存,存储Server接收到Log后,仅仅只会将Log添加到某个维护的Log列表中,并持久化这些Log(以便在崩溃后恢复),不需要立刻应用这些Log即可响应数据库实例。只有当存储Server接收到某个请求,该请求需要的数据必须应用Log后才能得到时,Server才会应用这些Log。如下图,只需要步骤1和步骤2完成了即可响应数据库实例,步骤4-8均可以在后台异步进行。

当实例接收到读请求时,它只需要读取其中3个存储Server的数据即可,并在其中找到版本号(通过LSN计算)最新的那个数据即可。实际上在Aurora中,实例维护了各个存储Server所拥有的Log的位置,因此实例在处理读请求时,只需要查询一个拥有足够Log的存储Server。

在存储Server中,Log并不是通过Checkpoint机制来应用的,存储Server以数据页为单位在内存中缓存数据,每个数据页都拥有自己的Log列表,当数据页需要更新时它能以最小的需求应用有关于自己的Log,Aurora将这个过程称为Page Materialization。更关键的是,由于存储系统和数据库实例是分离的,因此有关于Log的应用和数据的刷新都可以异步进行,而不影响数据库实例。

Aurora的性能很高,这得益于三点

网络中传输的数据只有Redo/Undo Log,不需要通过网络传输其它数据;

基于Quorum的容错机制,可以避免Server出错带来的性能下降;

存储系统可以在后台异步地、持续不停地、分布式地进行Log的应用和数据的持久化,数据库实例不需要写入任何数据,只需要维护缓存、传输Log即可。

Aurora可以提供以下能力的容错:

即便某个存储Server突然失联或者缓慢甚至停滞,整个系统仍可以保持很高的性能;

即便某个AZ中的所有存储Server全部崩溃,整个系统仍可以正常处理读写请求;

即便某个AZ中的所有存储Server外加另外一个AZ的某个Server崩溃,整个系统仍可以正常处理读请求;

一旦超出AZ+1个Server崩溃,整个系统就不能正常工作。

当某一个存储Server崩溃时,它需要恢复自己的状态,这是通过其它存储Server向其传输Log实现的,每个存储Server都维护并持久化自己Log的位置信息,并在崩溃时通过这些信息接收来自其它Server的Log,以便恢复到可以对外服务的状态。存储Server之间的Log传输是并行的,因此恢复的速度很快。


Frangipani

Frangipani是一个弹性的分布式文件系统,它的适用场景是比较集中的组织用于文件共享和协作,所有用户在相同的区域内并且距离比较近(即分布式指的是局域网分布),互相可以访问和修改彼此的文件,并且支持缓存的同时保证强一致性。Frangipani由三部分组成,分别是Frangipani File Server、Petal Server和Lock Service:

FFS(Frangipani File Server)运行在所有使用Frangipani的用户的电脑上,是一个标准的文件系统,可以运行在Linux系统上;

Petal是一个分布式存储系统,用于保存所有文件数据和信息,但Petal并不是一个专门用于文件系统的存储系统,它本身不理解文件系统的结构,仅仅提供比较原始的、通用的存储服务,Petal也支持副本,以提供容错性;

Lock Service用于协调各个用户(即各个FFS)的锁资源,锁用于保证一致性。

用户设备上的Frangipani服务

用户设备上的Frangipani服务

整个系统的Frangipani服务

整个系统的Frangipani服务

Frangipani使用本地缓存,即当用户访问文件资源时,设备上的FFS通过请求Petal获取相关的文件数据,然后用户所作的所有修改都仅仅保存在本地的FFS上,而不会立即发送给Petal,直到必要的时候才发送给Petal,即Frangipani的大部分逻辑都运行在用户设备上的FFS,因此天然具有可伸缩性,增加用户设备将自动增加系统可用的CPU资源,但这引出了两个问题:

在使用本地缓存的同时如何保证一致性?特别是在不同的用户访问和修改同个文件资源时,每个用户都应该能观察到其它用户所作的最新修改。

如何保证原子性?不同的用户修改同个文件资源时,不应该丢失任何一方的修改,Frangipani必须对所有用户的并发修改行为进行合理的排序。

如何保证隔离和容错?单个用户的设备或系统崩溃可以快速恢复,且不应该影响其它用户的正常使用。

Frangipani的一些设计:

FFS是一个可以运行在Linux系统上的文件系统实现,可以和系统内核协作,从用户角度上它和常规的UNIX风格的文件系统没有什么不同,底层的Frangipani驱动会负责和Petal交换数据,同时FFS也使用Linux系统的文件缓存,因此文件数据的修改直到fsync或者sync命令后才能持久化。

FFS只和Petal、Lock Service交换数据,FFS之间不直接沟通,Petal允许所有FFS请求和写入数据,因此必须保证所有的FFS是可信任的,这也限制了Frangipani的适用场景。

Petal本身提供副本和容错恢复能力(独立于FFS),原理类似于CRAQ的Chain Replication,FFS可以认为Petal是稳定的,不需要考虑Petal内部发生的崩溃。

FFS使用锁来保证一次性,并且锁的粒度是整个文件,而不是Petal中的某个数据块。

Petal使用虚拟地址空间进行寻址,地址空间的大小是2^64字节,这个地址空间按用途进行分区,如下图:

从左往右分别为:配置参数区域、日志区域、分配位图区域、文件Inode区域、小文件块区域、大文件块区域。每个小文件块是4KB,小于64KB的文件仅使用小文件块进行保存,大于64KB的文件可以使用大文件块进行保存,每个大文件块是1TB,这种配置下的Frangipani大约可以保存2^24个文件。

Frangipani实现缓存一致性的核心是Lock Service,LS是一个通用的读写锁(Multiple-Reader/Single-Writer)协调服务,可以为FFS分配锁定每个文件的读锁或者写锁,当多个FFS读取而不修改同个文件时读锁可以由多个FFS持有,当某个FFS想要修改某个文件时,它必须先持有该文件的写锁,且写锁和所有读写锁都是互斥的。LS可以运行在单独的Server上,也可以和Petal运行在相同的Server上,但逻辑上它们是独立的。和Petal一样,LS也不理解文件系统结构,仅仅只是协调锁资源。

具体地说,Frangipani要求:

任何FFS在缓存Petal中的文件数据时,必须同时持有该文件的锁;

任何FFS在释放文件的锁之前,必须将本地缓存写入到Petal中。

在LS中保存着每个文件的锁被哪个FFS所持有,同时每个FFS比也保存着自己持有哪些文件的锁。Frangipani使用粘性(Sticky)锁机制,即持有锁的FFS将一直持有锁(即便它已经不需要该锁),直到另一个FFS要求获取锁,FFS和LS之间使用4种RPC进行交互:

Request:由FFS向LS发送,表示FFS尝试获取某个文件上的锁;

Grant:由LS向FFS发送,表示LS授予FFS某个文件上的锁;

Revoke:由LS向FFS发送,表示LS请求撤销FFS某个文件上的锁;

Release:由FFS向LS发送,表示FFS同意撤销某个文件上的锁。

因此,当多个用户尝试修改同一文件时,锁机制将保证同一时间只有一个用户可以持有写锁,如果在A持有写锁的时候B尝试修改文件,LS将请求撤销A持有的锁,A如果正忙于修改锁定的文件,可以拒绝撤销; A如果已经不需要这把锁(由于粘性锁机制,A不会主动释放锁),A将接受撤销,并在释放锁之前将本地缓存写入到Petal,然后B获取到锁时就能观察到A所作的修改了。这种锁机制确实保证了本地缓存和Petal上数据的一致性,但缺点也很明显:它并不适用于并发量特别高、竞争特别强的场景。另一方面,由于Frangipani的锁粒度是整个文件(这简化了系统机制),因此会产生不必要的锁竞争,对于写入是十分不利的。事实证明,本地缓存使得Frangipani在FFS增多的时候获取几乎线性增长的读性能,但写性能却几乎没有增长甚至有所降低。

对于粘性锁机制,如果一个FFS一直持有锁而没有其它FFS争抢,那么它的本地缓存将永远不会写入到Petal,因此Frangipani强制了锁持有的最长时间以触发本地缓存的写入。

为了提高锁分配的性能,Frangipani将锁进行分组,并按组授予锁。为了保证原子性,Frangipani使用两阶段获取锁机制:在第一阶段,FFS提前确定所需的所有锁,为了确定,FFS可能要对文件的元数据进行扫描(例如遍历目录中的文件项),这个过程也需要获取锁;当FFS确定好所需的所有锁后进入第二阶段,FFS按照文件Inode的地址顺序(这是为了防止死锁,同时Frangipani精心设计Petal中的每个区域,确保每个区域不超过一个可能被共享的数据结构,这也是为了避免死锁)依次尝试获取所有锁,如果它发现某个文件的状态和第一阶段读取的有所不同说明另一个FFS已经修改了该文件,那么该FFS必须重新尝试两阶段获取锁,如果所有文件的状态和第一阶段读取的相同,说明FFS安全地锁定了所有要修改的文件,可以执行复杂的修改并保证原子性。

Frangipani使用写前日志(Write-Ahead Log,WAL)来实现崩溃恢复,所有的FFS在将本地缓存写入Petal之前,必须生成对应所有操作的日志,并将日志传输到Petal当中,只有当所有修改的日志都持久化到Petal后,FFS才被允许写入本地缓存。Frangipani的日志设计:

只生成文件系统中元数据的修改日志,日志中并不包含文件的实际内容,即Frangipani在崩溃恢复后保证文件系统在元数据层面上是完整的,但文件数据可能丢失(这种机制模仿了Unix风格的文件系统,在执行fsync和sync命令之前文件数据并不保证持久化,因此从用户角度来说是可以理解的,而且如果日志要保存文件数据的修改,不仅带来非常大的性能负担,还会带来数据泄露的风险)。

Frangipani的日志保存在Petal中而不是在FFS中,这与常规的分布式系统有所不同,并且Petal中的日志是Per FFS的,不同的FFS的日志是独立的。

当一个FFS崩溃时,可能处于如下几种状态:

(1)已经写入日志但没有写入完整的脏数据,此时Frangipani会协调另一个存活的FFS来执行Petal中的日志,以保证文件系统中元数据的完整性(损失了文件数据)。

(2)尚未写入日志,因此也未写入任何脏数据,此时另一个存活的FFS会发现Petal中没有任何日志需要执行,文件系统中的元数据依旧是完整的(损失了崩溃前本地缓存中的所有修改)。

(3)已经写入完整的脏数据,此时不会损失任何修改。

在上述过程中省略对于锁资源的恢复,实际上LS对锁的持有者实现租用(Lease)机制,默认的过期时间是30秒,锁的持有者必须定期续期,否则LS将在一段时间后视该FSS已经崩溃,并进行上述恢复过程。

注意,Frangipani在恢复完成之前不能释放这些锁,这可以避免系统向外暴露损坏的状态,同时在恢复的过程中不能执行所有日志,因为有些日志对应的操作可能已经成功写入到Petal了,这些日志不需要执行。为了保证恢复过程只执行那些因为持有锁期间崩溃而未写入到Petal的日志,Frangipani为每个512KB的日志块设置LSN(Log Sequence Number),并且为每个512KB的元数据块设置版本号(Version Number),恢复过程中比较LSN和版本号来判断是否需要执行日志。当恢复过程完成后,Frangipani释放所有锁,其它FFS可以正常访问和修改这些资源。最后,Frangipani在同一时间只允许一个FFS在某个Petal Server上执行恢复过程,以避免并行恢复的干扰。


二阶段提交(2PC)

2PC全称Two-Phase Commit即二阶段提交(或两阶段提交),是一个分布式事务提交协议。如果事务涉及的数据分布在不同的Server上(由于副本或分片),此时就要保证事务执行后,所有Server的数据都是一致的,注意即便每个Server支持本机事务的ACID也无法保证这件事,某个Server事务执行失败而其它Server的事务却执行成功就会导致数据不一致,2PC就是为了解决这个问题的,参与事务的所有Server通过RPC履行2PC协议,最终的结果只有两种:事务要么在所有Server上成功执行,要么在所有Server上失败。

在2PC协议中,发起分布式事务的Server称为协调者(Coordinator),其它执行事务的Server称为参与者(Participant),协调者并不能简单地通过RPC让参与者执行本机事务来实现分布式事务,原因是协调者并不能保证参与者都能成功执行事务,2PC的核心在于协调者在逻辑上发起一个更大的事务,其它参与者执行的所有本机事务都作为这个逻辑事务的子部分,以此来实现原子性。

下面假设协调者C发起一个分布式事务,包含A、B、C三个参与者,它们分别需要在本机上执行事务X、Y和Z,2PC最终将保证事务X、Y、Z要么同时成功要么同时失败。

2PC的第一个阶段(投票阶段)由协调者开始,协调者C决定发起分布式事务,生成该分布式事务的记录并持久化,然后通过RPC通知A、B、C分别执行事务X、Y、Z并等待答复,当A、B、C接收到RPC后,在本机上创建事务,并执行提交以外的所有操作,但不能提交该事务,随后答复协调者C。

注意,在上述过程中由于网络问题,协调者和参与者发送的RPC可能会丢失或者出错,甚至会重复到达,所以2PC协议中的所有Serve都必须具备重发和处理重复RPC的机制,保证所有行为仅仅执行一次。

当协调者C收到所有参与者(A、B、C)的肯定答复后,(通过RPC)向所有参与者发送PREPARE(意指prepare to commit)信息,告知参与者准备提交事务,每个接收到该信息的参与者判断自己是否能够提交本机事务,如果因为某些原因(例如其它事务的执行已经造成了冲突)无法提交,它作出否定答复并中止事务。如果可以提交,那么它暂时提交该事务,并作出肯定答复,并在作出答复前生成该答复的记录并持久化,这里的“暂时提交”指的是参与者并不能真正地提交该事务,至少不能暴露该事务造成的影响,并且保留足够的信息以便后续仍然能够中止该事务。

2PC的关键转折点就在于参与者对PREPARE信息的答复,一旦参与者作出答复,那么参与者在未来都无权决定该事务是提交还是中止,只能等待协调者作出下一步的指示,如果参与者一直没有收到协调者的下一步信息,也只能将事务维持在“暂时提交”的状态,不能擅自提交或中止事务,即便其它事务已经被阻塞。

在对PREPARE信息作出答复之前,参与者可以单方面拒绝执行或者中止事务,例如在最开始收到协调者发起的信息时参与者可以拒绝执行事务并作出否定答复,在收到PREPARE信息时参与者也可以单方面中止事务并作出否定答复,在这两种情况下协调者只要收到任何一个否定答复或者迟迟未能收到回复,它都将向所有参与者发送ABORT信息,参与者收到ABORT信息后回滚所有已执行但仍未真正提交的事务,整个分布式事务以失败告终。

如果协调者C收到了所有参与者(A、B、C)对PREPARE信息的肯定答复,就进入2PC的第二个阶段(提交阶段),它将作出最终的提交决定,生成分布式事务提交的记录并持久化,然后向所有参与者发送COMMITED信息,当参与者收到COMMITED信息后,真正提交本机事务,当所有参与者都提交本机事务后,整个分布式事务就成功提交了。

整个分布式事务是否能够提交,最终的决定权都在协调者手中,协调者一旦作出决定就无法撤销:要么发送ABORT信息,要么发送COMMITED信息。同时协调者必须无限期地保留并持久化分布式事务的结果,这是因为ABORT信息和COMMITED信息可能因为网络问题无法到达参与者,这时的参与者将处于等待状态但无权提交或中止本机事务,参与者也可能在等待的过程中发生崩溃,并在相当久的时间后才恢复,因此协调者必须保证自己能够在未来响应参与者对分布式事务的查询。

如果参与者在等待的过程中发生崩溃,并在某段时间后重启,它恢复所有处于PREPARED状态的事务在崩溃前的状态(包括重做日志,获取所需的锁资源),并仍需要询问协调者并等待来自协调者的指示。因此2PC并不能保证所有参与者能够同步提交事务,只能保证最终所有参与者要么都成功提交事务,要么都中止事务。

如果协调者发生了崩溃,那么整个2PC过程将会停滞,所有参与者都将永久等待,直到协调者恢复重启,在2PC协议中协调者是一个单点故障


Spanner

Spanner是Google研发的一个全球分布式关系型数据库,可以将大量的数据进行分片、复制并分布在全球的各个地理位置,同时提供一致性和容错恢复能力,Spanner是一个成功的商业数据库,作为Google Cloud的一部分对外出售。Spanner提供所谓的外部一致性(External Consistency):如果事务A在事务B启动前提交,那么事务B的提交时间戳必定大于事务A的提交时间戳。注意这里使用“提交时间戳”的原因是Spanner使用精准的时间戳来表示事务的提交顺序,换句话说,事务的提交顺序将构成一个唯一的时间表,外部一致性保证该时间表和事务的启动顺序是一致的。

Spanner将一个完整的部署称为Universe,每个Universe具有多个Zone,每个Zone分布在不同的数据中心,数据通过多个Zone进行分片或复制,每个Zone由多个SpanServer组成,其中有一个ZoneMaster,每个Zone有一个LocationProxy,用于接收用户请求并重定向到合适的SpanServer。每个Universe有一个单例的UniverseMaster和PlacementDriver,前者用于设置和显示全局配置信息和状态,后者用于进行SpanServer之间的数据迁移和负载均衡。

Spanner使用改进的Paxos来实现复制,数据通过不同的Zone进行复制,每个Zone位于不同的数据中心(每个数据中心可以运行多个Zone),维护同个数据的所有SpanServer组成一个Paxos Group,其中有一个Leader,数据通过Paxos进行同步,同时每个副本都有一个Transaction Manager,负责事务的处理:

 

Spanner支持分片,分片和复制是独立的两个维度,复制通过Paxos Group来配置,而分片通过Directory来配置,因此可以灵活地组合分片策略和复制策略。

Spanner使用多版本存储数据(类似于MVCC),每个版本都被分配一个提交时间戳(Timestamp),同时每个事务也被分配一个时间戳。Spanner将事务分为三种类型:读写事务(Read-Write Transaction)、只读事务(Read-Only Transaction)和快照读取(Snapshot Read),其中只读事务是需要在事务启动时明确向Spanner指定的,并非简单的不具有写操作的事务,而快照读取允许Client读取特定版本的数据,而非最新的数据。

对于读写事务来说,所分配的提交时间戳即事务提交时的时间戳;对于只读事务来说,所分配的提交时间戳则是事务启动时的时间戳。

对于读写事务来说,Spanner使用锁(2PL)来控制并发,Client的提交请求首先将被定向到地理位置最接近的Zone,然后Zone锁定数据并提交修改,同时底层的Paxos将触发其它Zone上的副本也进行提交,当这些都完成之后才释放锁,并向Client答复。注意,Client对数据的修改在提交之前都是缓存在本地的,因此在同个事务中读取操作是无法读取到当前事务写入的数据的。

如果读写事务涉及的数据分布在多个Zone当中,过程也是类似的,只是此时Spanner需要使用2PC来保证原子性,因此需要一个副本来充当事务协调者(Coordinator),注意SpanServer是运行在Paxos上的,因此2PC的协调者单点故障得到了缓解。

对于只读事务来说,Client的请求首先被定向到地理位置最接近的Zone,该Zone判断自己所拥有的数据是否是最新的,每个副本都维护一个Safe Time时间戳,这个时间戳是根据Paxos和Transaction Manager中最新处理的记录的时间戳来计算的,当Zone接收到Client的读请求时如果发现当前事务的时间戳并不在Safe Time范围内,那么它就不能处理该请求。
Spanner如何保证一致性呢?由于每个事务和每个数据版本都具有一个时间戳,因此Spanner只需要保证具有提交时间戳TS的事务只能读取最新时间戳小于TS的数据,同时不能写入最新时间戳大于TS的数据。同时,为了保证外部一致性,即事务的提交顺序和事务的启动顺序一致,提交时间戳较大的事务必须等待所有提交时间戳较小的事务提交完成后才能提交,这种机制十分依赖于时间戳的准确性,因此Spanner维护了一个用于获取精确时间戳的基础设施,即TrueTime

TrueTime的原理是通过遍布全球各地的原子钟和GPS结点来获取准确时间,每次获取时间戳时TrueTime都会收集来自多个源的时间,并使用统计算法来计算出一个误差被限制的结果,TrueTime的创新在于它返回的是一个时间戳区间[Earliest, Latest]而不是单个时间戳,Spanner主要通过3种命令使用TureTime:

TT.now():返回现行时间戳区间[Earliest, Latest],并保证真实的时间戳必定位于该区间当中。

TT.after(t):如果确保真实的时间戳大于t,返回真,如果不确定则返回假。

TT.before(t):如果确保真实的时间戳小于t,返回真,如果不确定则返回假。

当协调者接收到事务的提交请求时,它调用TT.now()得到当前时间戳区间,并将Latest作为事务的提交时间戳(这称为Start原则),即协调者收到提交请求的实际时间必定早于事务的提交时间戳。同时Spanner要求其它事务直到TT.after(Latest)为真之前都不允许观察到该事务,即协调者必须等到TT.after(Latest)为真时才能真正地提交事务(这称为Commit Wait原则),即事务的真实提交时间是稍晚于事务的提交时间戳的。

假设事务A和B的实际开始时间分别为SA和SB,事务A和事务B的提交时间戳分别为TSA和TSB,事务A和事务B的实际提交时间分别为CA和CB,事务B的提交请求到达协调者的实际实际为RB,假设事务B在事务A提交后才启动,即SB > CA,接下来证明TSA < TSB:

由Commit Wait原则有:TSA < CA

由假设有:CA < SB

由因果律有:SB <= RB

由Start原则有:RB < TSB

综上可以得到,若事务B在事务A提交后才启动,必定有TSA < TSB,这即是Spanner保证的外部一致性,即事务的启动顺序和事务的提交时间戳是一致的。


FaRM

FaRM是Microsoft研发的一个支持分布式事务的强一致性(Strict Serializable)的高性能高可用的内存数据库。FaRM的创新在于它充分利用了NVDRAM和RDMA:

NVDRAM:Non-Volatile DRAM,即非易失性动态内存,FaRM将数据库的绝大部分数据都保存在NVDRAM中以追求性能,FaRM实现非易失性的原理是为所有Server上的DRAM提供锂电池电源,当设备断电时储备的电源足够将DRAM中的数据写入到SSD中,微软将这种方法称为Distributed Uninterruptible Power Supply (UPS)。

RDMA:Remote Direct Memory Access,即远程内存直接访问,这种方式允许一个Server直接访问另一个Server上的DRAM而不需要CPU干预,FaRM大量利用了RDMA来实现RPC。

为了利用RDMA来实现RPC,FaRM需要一种特殊的网卡(NIC),这类网卡支持Kernel Bypass即内核旁路,这允许应用程序直接访问网卡上的数据而不需要内核的参与,同时FaRM实现了一种特殊的网卡驱动,可以指示网卡执行RDMA并接收结果。

Microsoft为FaRM配置的运行环境中,所有Server都使用Windows Server系统,每个Server拥有128-512G的DRAM和至少2个支持内核旁路的NIC,每个NIC的传输带宽为56Gbps,几乎所有数据都保存在DRAM中,所有Server都配备UPS,以便在断电的时候将DRAM中的数据写入到SSD。所以FaRM和常规的分布式数据库非常不同,是一种激进的尝试,测试表明FaRM在TATP(一种测试标准,读占比约70%)下每秒可以执行14亿个事务。

由于数据保存在DRAM而不是SSD中,因此FaRM为应用程序提供了跨Server的虚拟地址空间抽象,虚拟地址将被映射到具体的DRAM地址当中。每个Server在任何时候都可以发起一个事务,发起事务的Server自动成为事务协调者(Transaction Coordinator),FaRM使用乐观并发控制协议(OCC),每个数据对象都具有一个64位的版本号,事务中的修改先缓存在本地,在成功提交后才对其它事务可见,如果在提交过程中发现冲突那么事务中止并回滚。

FaRM将整个虚拟地址空间分为多个Region,每个Region为2GB,由一个全局的Configuration Manager(CM)来分配Region;FaRM使用主从复制,读操作总是读取Primary上的数据,CM负责管理Region到Primary和Backup的映射。一个完整的FaRM配置可以通过<i, S, F, CM>来确定,其中i是配置标识符(64位),S是Server集合,F是Server到容错区域、主从复制的相关映射,CM则是作为Configuration Manager的Server,FaRM通过ZooKeeper来迁移配置。

所有Server基本只使用RDMA来沟通,FaRM将RDMA分为两种类型,一种是Read RDMA,用于读请求,当ServerA想读取ServerB上的某个数据对象时,A上的FaRM应用通过内核旁路向NIC发起请求,随后NIC通过网络线路向B上的NIC发起请求,B上的NIC执行RDMA直接访问B上的内存且不需要B上的CPU进行任何干预,并将结果返回给A,A上的NIC将结果通过内核旁路发送给FaRM应用。另一种是Write RDMA,用于写请求,过程是类似的,区别只在于A上的NIC会将欲写入的数据传输给B上的NIC。

FaRM使用一种类似于2PC的方法来提交事务,如下图,虚线代表Read RDMA,实线代表Write RDMA,点线代表ACK,矩形代表数据。FaRM的提交过程分为5个阶段:

发起事务的一方(协调者)首先通过Read RDMA读取所需的数据对象,并在本地进行修改,这便是图中的Execute Phase,完成之后开始提交,进入Commit Phase:

(1)LOCK阶段:协调者向涉及事务写入过的数据对象的所有Primary发送LOCK日志项,这包括事务ID、数据对象的版本号和数据对象的新值,当Primary接收到LOCK时,尝试对这些数据对象进行锁定,但在锁定前必须判断当前的版本和LOCK日志项中的版本是否一致,如果不一致则协调者将中止事务。如果数据对象上的锁正在被另一个事务所持有,Primary不会等待,而是立即作出失败响应,协调者中止事务。注意:Primary此时不能写入新数据。

(2)VALIDATE阶段:协调者通过Read RDMA再次读取事务读取但未写入的所有数据对象,并判断它们的版本号是否和Execute Phase时读取的相同,如果不同那么中止事务。

(3)COMMIT BACKUP阶段:协调者向涉及事务写入过的数据对象的所有Backup发送COMMIT-BACKUP日志项,这包括事务ID、数据对象的版本号和数据对象的新值,并等待所有Backup的肯定答复,只有收到所有Backup的肯定答复后才能进入下一阶段,否则只能等待或中止事务。注意:Backup此时不需要写入新数据。

(4)COMMIT PRIMARY阶段:协调者向涉及事务写入过的数据对象的所有Primary发送COMMIT-PRIMARY日志项,仅包括事务ID,当Primary接收到COMMIT-PRIMARY时,就可以写入新数据并释放数据对象上的锁,事务提交完成并作出肯定答复。协调者一旦收到任何一个Primary的肯定答复,就可以向应用程序作出答复表示事务执行完成。

(5)TRUNCATE阶段:当协调者接收到来自所有Primary对COMMIT-PRIMARY的肯定答复后,就可以通知所有Primary和Backup清理日志项,此时Backup写入新数据。

在这个提交过程中,事务真正提交的时刻在VALIDATE通过之后,而多个事务将在LOCK阶段加锁时被序列化。对于只读事务来说,由于没有写入任何数据对象,因此也不需要加锁,整个过程只需要Read RDMA,甚至不需要被读取的Server上的CPU进行任何干预,因此性能非常高。

实际上这个提交过程是有瑕疵的,它并不能解决Fuzzy Read的问题:

T1:R(X)          VALIDATE                                                                                                            COMMIT

T2:R(X)                                     LOCK          VALIDATE          W(X)          COMMIT

上述过程满足FaRM的提交过程,但T1将读取到过时的X。FaRM论文中并没有介绍这种情况下如何处理,显然需要有更多的机制来防止这种错误。


MapReduce

MapReduce是一种简单的可以运行在集群上的大数据处理编程模型,它主要有两个阶段:Map和Reduce,MapReduce以一组KeyValue为输入,最终输出另一组KeyValue。在MapReduce中,Server被分为Master和Worker两种,Worker向Master定期询问,Master为Worker分配任务(要么是Map任务,要么是Reduce任务),Worker执行完任务后通知Master,Master维护全局的任务信息以协调任务的执行。

MapReduce通常运行在一个分布式文件系统(如GFS)之上,这样可以适应大数据的文件分布情况来进行集群计算,每个Server上的数据通常由该Server或距离该Server最近的Server上的MapReduce来处理,这样就节省了数据传输到各个Server的网络资源。同时借助分布式文件系统,MapReduce也无需关心数据传输的细节。

在Map阶段中,Worker读取各自的输入文件,并将其中的KeyValue进行分组,分组的逻辑取决于具体的应用场景,Map任务结束后将输出一组Intermediate KeyValue,即

map (k1,v1) → list(k2,v2)

reduce (k2,list(v2)) → list(v2)

当Map阶段结束后,所有输入文件中的KeyValue都被计算为Intermediate KeyValue,它们将被传输到各个执行Reduce任务的Worker所在的Server,每个执行Reduce任务的Worker将从这些Intermediate KeyValue选择它们所负责的那一部分(通常按Key分配),并进行合并。当Reduce阶段结束后,所有的Intermediate KeyValue都被计算为最终合并输出的KeyValue,这些输出分布在执行Reduce任务的Worker所在的Server,应用可以按需求获取这些结果。

Worker通过向Master询问来获取任务,Master可以为Worker分配Map任务或者Reduce任务,并期望某个时间段内能够接收到任务完成的通知,如果Master无法在某个既定的时间收到该Worker的通知,那么它会认为该Worker已经不可用,并将该任务分配给另一个Worker。

在Master中维护类似如下信息:

type Master struct {
    //用于元数据上的锁
    MatedataMutex sync.Mutex
    //当前阶段
    Phase int
    //输入文件
    InputFiles []string
    //输入文件数
    FileNum int
    //Map任务是否分配
    MapAllocatedFlags []bool
    //Map任务是否完成
    MapCompletedFlags []bool
    //Map任务完成数
    MapCompletedNum int
    //Reduce任务最大数量
    NReduce int
    //Reduce任务是否分配
    ReduceAllocatedFlags []bool
    //Reduce任务是否完成
    ReduceCompletedFlags []bool
    //Reduce任务完成数
    ReduceCompletedNum int
}

当Master接收到Worker的任务申请时:

// worker申请任务
func (m *Master) AskForTask(args *AskForTaskArgs, reply *AskForTaskReply) error {
    m.MatedataMutex.Lock()
    defer m.MatedataMutex.Unlock()
    if m.Phase == MapPhase {
        //处于Map阶段,分配Map任务
        for index, file := range m.InputFiles {
            if !m.MapAllocatedFlags[index] {
                //找到可分配的Map任务
                //fmt.Printf("Map Task Allocated %v\n", index)
                reply.TaskType = MapType
                reply.NReduce = m.NReduce
                reply.Filename = file
                //任务编号与文件索引一致
                reply.TaskNumber = index
                m.MapAllocatedFlags[index] = true
                //等待任务完成
                go m.WaitForTask(MapType, index)
                return nil
            }
        }
    } else if m.Phase == ReducePhase {
        //处于Reduce阶段,分配Reduce任务
        for index := 0; index < m.NReduce; index++ {
            if !m.ReduceAllocatedFlags[index] {
                //找到可分配的Reduce任务
                //fmt.Printf("Reduce Task Allocated %v\n", index)
                reply.TaskType = ReduceType
                //任务编号与Reduce索引一致
                reply.TaskNumber = index
                reply.NReduce = m.NReduce
                m.ReduceAllocatedFlags[index] = true
                //等待任务完成
                go m.WaitForTask(ReduceType, index)
                return nil
            }
        }
    }
    //否则提示worker等待
    reply.TaskType = WaitType
    return nil
}

// 等待任务完成
func (m *Master) WaitForTask(taskType int, taskNumber int) {
    //等待10秒
    time.Sleep(time.Second * 10)
    m.MatedataMutex.Lock()
    defer m.MatedataMutex.Unlock()
    //如果10秒后任务未完成,将其标记为未分配
    if taskType == MapType {
        if !m.MapCompletedFlags[taskNumber] {
            m.MapAllocatedFlags[taskNumber] = false
            //fmt.Printf("Map Task Timeout %v\n", taskNumber)
        }
    } else if taskType == ReduceType {
        if !m.ReduceCompletedFlags[taskNumber] {
            m.ReduceAllocatedFlags[taskNumber] = false
            //fmt.Printf("Reduce Task Timeout %v\n", taskNumber)
        }
    }
}

当Master接收到Worker的任务完成通知时:

// worker完成任务
func (m *Master) CompleteTask(args *CompleteTaskArgs, reply *CompleteTaskReply) error {
    m.MatedataMutex.Lock()
    defer m.MatedataMutex.Unlock()
    if args.TaskType == MapType {
        //Map任务完成
        //fmt.Printf("Map Task Completed %v\n", args.TaskNumber)
        m.MapCompletedFlags[args.TaskNumber] = true
        m.MapCompletedNum++
        //判断Map任务是否全部完成,如果是就进入Reduce阶段
        if m.MapCompletedNum == m.FileNum {
            m.Phase = ReducePhase
        }
    } else if args.TaskType == ReduceType {
        //Reduce任务完成
        //fmt.Printf("Reduce Task Completed %v\n", args.TaskNumber)
        m.ReduceCompletedFlags[args.TaskNumber] = true
        m.ReduceCompletedNum++
        //判断Reduce任务是否全部完成,如果是就进入Completed阶段
        if m.ReduceCompletedNum == m.NReduce {
            m.Phase = CompletedPhase
        }
    }
    return nil
}

Worker只需不断向Master申请并执行任务,直到Master通知整个MapReduce已经结束:

unc Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {

    for {
        //向master申请任务
        reply, ret := AskForTask()
        //fmt.Println("Asked Task")

        if !ret {
            //fmt.Println("Worker existed")
            break
        }

        switch reply.TaskType {
        case WaitType:
            //master提示等待1秒
            //fmt.Println("Sleeping")
            time.Sleep(time.Second)
            continue

        case MapType:
            //master分配了Map任务
            //fmt.Printf("Mapping %v\n", reply.TaskNumber)
            file, err := os.Open(reply.Filename)
            if err != nil {
                log.Fatalf("cannot open %v\n", reply.Filename)
            }
            content, err := ioutil.ReadAll(file)
            if err != nil {
                log.Fatalf("cannot read %v\n", reply.Filename)
            }
            file.Close()

            //执行Map得到KeyValue数组
            kva := mapf(reply.Filename, string(content))
            tfiles := make([]*os.File, reply.NReduce)
            tencoders := make([]*json.Encoder, reply.NReduce)

            //分配临时文件和Json编码器
            //临时文件用于KeyValue分组并分配给各个Reduce任务
            //临时文件的数量为NReduce
            for i := 0; i < reply.NReduce; i++ {
                //tfiles[i], err = os.Create(fmt.Sprintf("mr-%v-%v", reply.TaskNumber, i))
                tfiles[i], err = ioutil.TempFile("", "Temp")
                if err != nil {
                    log.Fatalf("cannot create temp file for %v-%v", reply.TaskNumber, i)
                }
                tencoders[i] = json.NewEncoder(tfiles[i])
            }

            //将KeyValue以Json格式写入临时文件
            for _, kv := range kva {
                enc := tencoders[ihash(kv.Key)%reply.NReduce]
                err = enc.Encode(&kv)
                if err != nil {
                    log.Fatal("cannot encode keyvalue\n")
                }
            }

            //重命名临时文件
            //命名格式:mr-TaskNumber-i
            for i := 0; i < reply.NReduce; i++ {
                os.Rename(tfiles[i].Name(), fmt.Sprintf("mr-%v-%v", reply.TaskNumber, i))
            }

            //提交Map任务完成的RPC
            complete := CompleteTaskArgs{}
            complete.TaskNumber = reply.TaskNumber
            complete.TaskType = MapType
            _, _ = CompleteTask(complete)

            //fmt.Printf("Mapped %v\n", reply.TaskNumber)

        case ReduceType:
            //master分配Reduce任务
            //fmt.Printf("Reducing %v\n", reply.TaskNumber)

            //枚举Map任务生成的临时文件
            paths, err := filepath.Glob(fmt.Sprintf("mr-*-%v", reply.TaskNumber))
            if err != nil {
                log.Fatalf("cannot glob paths %v\n", reply.TaskNumber)
            }
            //fmt.Printf("len paths %v\n", len(paths))

            //kva存放最终结果
            kva := []KeyValue{}
            var waitGroup sync.WaitGroup
            var mutex sync.Mutex

            for _, path := range paths {
                //使用协程并发处理各个临时文件
                waitGroup.Add(1)
                go func(path string) {
                    defer waitGroup.Done()
                    rfile, err := os.Open(path)
                    defer rfile.Close()
                    if err != nil {
                        log.Fatalf("cannot open file %v\n", path)
                    }
                    //使用Json解码器读取临时文件
                    dec := json.NewDecoder(rfile)
                    for {
                        var kv KeyValue
                        if err := dec.Decode(&kv); err != nil {
                            //log.Fatalf("caonot decode %v\n", err.Error())
                            break
                        }
                        mutex.Lock()
                        //保存最终结果
                        kva = append(kva, kv)
                        mutex.Unlock()
                    }
                }(path)
            }
            waitGroup.Wait()

            //对kva按Key排序
            sort.Sort(ByKey(kva))

            //将最终结果合并后输出到最终文件:mr-out-i
            ofile, err := os.Create(fmt.Sprintf("mr-out-%v", reply.TaskNumber))
            if err != nil {
                log.Fatalf("cannot create ofile %v\n", reply.TaskNumber)
            }
            i := 0
            for i < len(kva) {
                j := i + 1
                //合并相同项结果
                for j < len(kva) && kva[j].Key == kva[i].Key {
                    j++
                }
                values := []string{}
                for k := i; k < j; k++ {
                    values = append(values, kva[k].Value)
                }
                output := reducef(kva[i].Key, values)

                // this is the correct format for each line of Reduce output.
                fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)

                i = j
            }

            ofile.Close()

            //提交Reduce任务完成的RPC
            complete := CompleteTaskArgs{}
            complete.TaskNumber = reply.TaskNumber
            complete.TaskType = ReduceType
            _, _ = CompleteTask(complete)

            //fmt.Printf("Reduced %v\n", reply.TaskNumber)
        }

    }
}

Spark

Spark是专为大规模数据处理而设计的快速通用的计算引擎,通过尽可能地将数据集保存在Server的内存当中,并协调多个Server完成数据集的计算,达到非常高的大数据分布式处理性能。在Spark中,将每个Server处理的数据集称为RDD(Resilient Distributed Datasets),RDD是不可变的(只读的),RDD只能通过两种方式创建:(1)通过读取持久化存储中的数据文件来创建或(2)通过对某一个RDD进行某种操作,Spark将这些操作称为Transformation,包括map、filter、join

Spark的创新在于它尽可能地将RDD保存在内存而不是外存当中,只有在内存空间不足的情况或者用户显式要求下Spark才会考虑将RDD持久化到外存,这使得Spark基本不含有太多写入存储、从存储中读取的操作,所有的操作都在内存中进行,因此效率很高。由于RDD是不可变的,因此Spark可以很简洁地表达RDD之间的关系,除了最开始从存储中读取数据文件创建的RDD外,其它RDD都是通过在另一个RDD上施加某个Transformation得到的,因此如果某个Server丢失了某个RDD,它只需从先前的RDD中再次施加该Transformation就可以得到。

除了Transformation之外,用户还可以对RDD进行计算以获取所需的数据,例如count、collect、save,Spark将这些计算称为Action。当用户对RDD进行Transformation时,Spark并不会立即进行处理,而是将这些操作记录下来,只有当用户执行Action获取数据时,Spark才会真正地执行Transformation。

和MapReduce一样,Spark通常运行在某个分布式文件系统上,最常见的是HDFS。为了使用Spark,用户(即大数据开发者)启动一个Driver程序,并与众多Worker节点相连接,这些Worker节点通常也允许在HDFS之上,因此它们可以利用HDFS分区数据文件。Driver将定义最基础的几个RDD,并跟踪维护所有RDD的谱系图(Lineage Graph),这些谱系图将描述所有RDD的依赖关系,一旦用户激活数据处理,Driver将通知所有Worker,Worker负责不同的数据分区,由于每个Worker可能拥有不同进度的RDD,因此Worker只是根据谱系图按需进行数据处理。Spark提供的用户接口包括如下,这些接口通过Scala和Java语言实现:

其中某些操作如map、filter是可以在多个Worker并行执行的,但某些Transformation则比较特殊如join,为了执行join操作,Spark必须等待所需的所有RDD都已计算完成,为了区分这两种Transformation,Spark将RDD之间的依赖关系分为两种类型:Narrow Dependency和Wide Dependency,Narrow表示每个分区上的Parent RDD最多只会被一个分区上的Child RDD所使用,而Wide表示每个分区上的Parent RDD可能被多个分区上的Child RDD所使用。map、filter等属于Narrow,而join、groupByKey等则属于Wide。一个PageRank的示例如下,在这段代码中,直到ranks.collect方法执行时,Spark才会真正开始处理数据。

// Load graph as an RDD of (URL, outlinks) pairs
val links = spark.textFile(...).map(...).persist()
var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
    // Build an RDD of (targetURL, float) pairs
    // with the contributions sent by each page
    val contribs = links.join(ranks).flatMap {
        (url, (links, rank)) =>
        links.map(dest => (dest, rank/links.size))
    }
    // Sum contributions by URL and get new ranks
    ranks = contribs.reduceByKey((x,y) => x+y)
    .mapValues(sum => a/N + (1-a)*sum)
}
var output = ranks.collect()

对应的谱系图如下:

由于Spark所使用的RDD抽象非常简明,因此容错恢复和性能优化都很容易实现。Spark可以根据谱系图调度那些拥有最多所需RDD的Worker来执行任务,如果某个任务执行失败了或者某个Worker崩溃了,Spark只需找到另一个合适的Worker即可,通常Spark会选择拥有其Parent RDD的Worker,这些Worker可以很快计算出所需的RDD。

尽管Spark尽可能将RDD保存在内存当中,但内存不足时Spark也必须将RDD持久化到外存当中,同时为了防止容错恢复所需的操作链过长,Spark也需要定期执行Checkpoint,特别是对具有Wide Dependency的操作而言,数据的丢失可能会引起整个流程重新执行。


Memcached(Facebook)

Memcached是一个分布式的高速缓存系统,类似于Redis,Facebook使用改进的Memcached来作为其社交网站运行的缓存中间件,每秒处理超过十亿的请求,并存储超过数万亿的数据项。Facebook的Memcached系统整体架构如下:

其中数个FE(Front-End)集群(包括WebServer和Memcached)和一个Storage集群组成一个Region,其中Storage集群是MySQL数据库,Facebook使用MySQL基于日志的同步功能来实现复制,每个Storage集群包括一个完整的数据副本。

在了解这个系统的运行机理之前,有必要了解Facebook对一致性的要求,由于社交网络中用户消费内容的占比远远超过于创建内容,读操作的占比远超过写操作的占比,因此缓存是十分重要的,这意味着系统中的Memcached集群将承受远超Storage集群能够承受的负载,因此Facebook必须解决缓存穿透、缓存击穿、缓存雪崩的问题,同时要保证当Memcached Server出现故障之后请求流不会冲垮背后的Storage Server。Facebook对于缓存一致性的要求不需要那么强,允许返回一定限度内过期的缓存,这不会影响用户体验,但对于某一个用户来说,必须能够读取到其刚刚写入的内容,避免令人迷惑。

读取和写入的大致操作流程如下:对于读操作来说,WS(WebServer)首先尝试访问缓存,如果缓存命中则不需要访问数据库,否则WS从数据库中获取数据,并该数据更新到缓存当中。对于写操作来说,WS首先更新数据库,然后删除对应的缓存项目,这里的选择在于Facebook并没有在写入数据后更新缓存而是选择删除缓存项,这是因为删除是幂等的而更新不是幂等的,如果选择更新缓存那么就会引入并发和重复执行问题,由于缓存并不是权威的数据来源,因此删除操作是不影响数据安全的。

由于Facebook部署的是一个分布式缓存系统,因此在一个Memcached集群中,缓存通过一致性哈希分配到各个Memcached Server,WS可能需要请求多个缓存服务器来获取所需的内容,每个请求可以查询多个缓存项目,在Facebook中平均一个请求查询24个Key。

为了提高请求的效率,Facebook将大部分机制实现在Memcached Client当中,即缓存客户端而不是缓存服务器,这是因为客户端是无状态的,因此实现起来比较简单。客户端执行序列化、压缩、路由、错误处理、请求批量化等操作,同时客户端还会维护数据之间的依赖关系图(Directed Acyclic Graph,DAG)以最大化每个请求所能获取的数据量。在这个机制下,缓存服务器之间不需要沟通,所有的流程都以缓存客户端为中枢。同时,对于读请求来说RPC使用UDP协议,对于写请求来说RPC使用TCP协议,同时缓存客户端实现全局的类似于TCP的拥塞控制机制。

在写操作之后,WS只会删除本地缓存服务器上的缓存,集群中的其它缓存服务器仍然保留过时的缓存,这些缓存的失效是由后端的Storage集群来负责的,每个MySQL事务都将包含其对应的缓存标识,并且在事务执行完成后将这些缓存标识定期成批发送给McSqueal服务器,这些服务器将负责删除这些缓存。有了这个机制后,为什么还需要让WS删除本地缓存服务器上的缓存呢?这是为了快速让本地缓存达成一致,防止对单个用户写入后读取相同的数据却得到过时数据的情况。

为什么不让WS来负责失效所有缓存呢?原因是McSqueal的效率更高,因为缓存的失效是批量的、流水线的,而且后端的Storage集群中的MySQL可以通过保存日志来提供必要的容错恢复机制。

缓存雪崩当大量数据缓存在同一时间过期时,大量的请求将导向数据库服务器。解决缓存雪崩的方法一般有:(1)随机化缓存过期时间(2)互斥锁:保证只有一个请求可以构建新的缓存(3)后台更新缓存:业务层不负责更新缓存也不设置缓存过期时间,而是让缓存永久有效,缓存的更新由后台系统完成。Facebook使用的是类似于互斥锁的方法,当缓存未命中时,缓存服务器将授予请求的WS一个Lease(租约),只有拥有Lease的WS才能更新该缓存项,其它没有Lease的WS只能等待,同时限制每十秒内同个Key最多授予一个Lease,如果拥有Lease的缓存服务器宕机了那么一段时间后缓存服务器将再次授予Lease给另一个WS。

缓存击穿当某一个热点数据缓存过期后,大量的请求导向数据库服务器。类似于缓存雪崩,解决缓存击穿的方法一般有:(1)互斥锁(2)由后台更新热点数据缓存

缓存穿透当被大量访问的数据既不在缓存中,也不在数据库中,导致系统压力骤增。解决缓存穿透的方法一般有:(1)将恶意请求提前拦截,避免其访问缓存和数据库(2)缓存空值或者默认值(3)使用布隆过滤器快速判断数据是否存在,避免频繁查询数据库。

Facebook同样使用Lease机制来防止Stale Set问题,这个问题发生的原因在于发送到缓存服务器上的写操作可能是乱序的,最后一次执行的写操作可能并不是最新的数据,此时缓存中将保存过时的数据,直到下一次更新缓存,通过Lease缓存服务器可以识别出哪个写操作才是最新的。

如何解决缓存服务器的宕机问题呢?如果缓存服务器宕机了,那么其Lease机制也就失去作用,第一个安全措施是缓存客户端的路由机制,如果客户端认为集群中的某个Memcached服务器宕机了,那么它将请求导向另一个Memcached服务器,但是这种方法是有局限性的,因为两个服务器所拥有的缓存重合度可能并不高,因此切换之后会有一段时间的数据库请求高峰。如果整个Memcached集群都宕机了,第一个安全措施就没用了,为了应对这种情况,Facebook单独设置了一个Gutter集群,这些集群保持空闲状态,如果某个Memcached集群宕机了,那么该Region内的所有缓存工作将由Gutter集群顶替。

Facebook将Memcached集群中的服务器以Pool为单位进行管理,根据不同的应用场景、访问模式和缓存热度来分配不同的Pool,热缓存(Hot Key)可能被分配到容量更大的Pool,并在Pool内进行复制。同时Facebook支持缓存客户端将缓存导向各个随机的Memcached集群,但这么做的话请求就变成跨集群的了,请求的效率不高,但可以增加缓存的容错率,对于冷缓存来说可以进行跨集群复制,热缓存则不合适。

Facebook提供缓存预热机制,当一个FE集群刚刚刚启动时,其中的Memcached还没有任何缓存项,此时大量的请求将导向数据库,Facebook允许冷集群中的Memcached在刚刚启动时向热集群中的Memcached请求缓存,直到缓存填充到某个标准。在预热的过程中还需要一些机制保证一致性,这是因为集群间的缓存同步是缓慢的(注意在MySQL的主从机制下写操作必须先发送到主实例,然后同步到从实例,因此主从复制是有一定延时的,这就导致了集群间的缓存同步是比较缓慢的),如果冷集群中的某个客户端更新了某个缓存,而热集群中的缓存还未更新,此时冷集群从热集群中访问的将是过期的缓存数据。

另外,由于MySQL主从复制的高延迟,如果某个客户端写入了某个数据,在写入时它确实会更新本地集群上的缓存,但这个缓存可能由于某些原因(如Memcached服务器宕机或者缓存驱逐)被删除,此时如果MySQL主从复制仍未完成,客户端将查询到过期的数据。为了解决这个问题,当Slave集群想要执行某个写操作时,它必须标记该数据项,并由MySQL主从复制完成后删除该标记,所有对标记数据项的访问都应该直接查询主数据库。


COPS

COPS全称Clusters of Order-Preserving Servers,是一种提供Causal Consistency(因果一致性)的可扩展的KeyValue数据库。所谓Causal Consistency是一种比较弱的一致性保证,通俗地说即在一定的因果关系下,因操作总是发生于果操作之前,在COPS中因果关系(~)由以下规则确定:

对于同一个客户端上下文来说,如果操作A先于操作B执行,那么A~B;

如果读操作B返回的值由写操作A写入,那么A~B;

传递性:如果A~B且B~C,那么A~C。

和最终一致性(Eventual Consistency)不同的是,因果一致性不会返回失序的状态,而最终一致性则有可能。例如,对于一个相册服务来说,如果用户上传了图片P同时将该图片加入相册L,此时相当于两个操作:UploadPhoto和AddPhotoReferenceToAlbum,在最终一致性下可能会出现相册中已经添加了对该图片的引用但图片还未上传的情况,但在因果一致性下如果相册已经包含对该图片的引用,那么该图片一定已经上传。

在上图中,get(y)获取了由put(y)写入的值,因此put(y) ~ get(y),以此类推,箭头表示因果关系。如果A~B和B~A都不满足,那么操作A和操作B就可以并发执行而不会出错,例如两个不相干的PUT操作可以以任何顺序进行复制,而对相同的Key进行的两个PUT操作则会发生冲突,在这种情况下需要有一种既定的机制来处理冲突,常见的原则是Last-Writer-Wins,也称为Thomas's Write Rule,这种机制始终以时间上更晚的PUT操作为优先,COPS就采取这种原则。

COPS如何表达操作的时间顺序呢?通过Lamport Timestamp,这是一种逻辑时钟机制,可以为分布式系统下的操作规定一种时间顺序。

COPS的另一个优势在于,在相对较弱的因果一致性下COPS以很小的代价来实现写操作仅仅只需等待本地服务器(本地集群)的响应,数据是在因果顺序下异步复制的,这样写操作性能很高。

COPS实现因果一致性的核心是通过数据的版本号(Versions)和依赖(Dependencies),COPS使用一个上下文(Context)来描述一个客户端实例,一个客户端可以同时运行多个上下文,每个上下文都维护当前实例执行的操作以来所需要的依赖,如果操作A的对象是数据a,操作B的对象是数据b,则A~B相当于a~b,因此a必须先于b写入,此时称a为b的依赖之一。COPS实现因果一致性的原则即仅当该数据的所有依赖都已经写入后才能写入该数据用户通过客户端接口来使用COPS,所提供的接口有三个:get、put和get_trans,这些接口由客户端库(Client Library)实现,客户端库使用另一组接口和服务器沟通,这组接口包括get_by_vers、put_after和dep_check

每个数据中心运行一个COPS集群,一个集群包括一个完整的数据副本,无论是get还是put操作都直接发送到本地集群中的服务器,集群内的服务器分片负责不同部分的数据,也可以对少量的数据进行复制,这种复制是通过Chain Replication来实现的,操作无需等待集群间复制完成即可返回,集群间的复制是异步发生的:每个数据(键)在集群中都有一个主节点(Primary Node),其它集群中相同键的主节点称为等效节点,集群间的复制是通过主节点异步发送到等效节点来完成的,当某个集群中主节点接收到来自其它等效节点的复制请求,它等待该数据的依赖全部满足后执行该操作。

代码示例如下:

实例每次执行完一个操作之后,都会根据上述的因果关系确定规则来更新其依赖,如下图。对实例中涉及的每个数据(键),实例记录其最小依赖(Nearest Deps),最小依赖是通过完整依赖(All Deps)优化精简得到的,这是由于依赖关系相当于一个有向无环图,根据传递性可以化简。在默认的COPS版本中,一旦一个实例执行完成一个put操作,它便可以清空执行put操作之前存在的所有依赖,这是因为put操作所产生的依赖项将隐式包含之前所有的依赖,这是一种优化措施。客户端库将实现并执行如下接口:

<bool,vers> ← put_after (key, val, [deps], nearest, vers=∅)

bool ← dep_check (key, version)

<value, version, deps> ← get_by_version (key, version=LATEST)

当客户端发起put操作时,客户端库将调用put_after接口向本地集群发送请求,该请求包括数据键、数据值、数据的最小依赖或者完整依赖、以及版本号。当负责该数据的主节点服务器接收到该请求时,它将创建新版本并保存相关依赖到当前上下文,随后等待所有依赖都已写入后才写入该数据。对于单个客户端来说,put操作是串行的,因为后执行的put必须等待先执行的put完成之后才能获取正确的版本号,这可以保证COPS中版本号和依赖关系是同步的,版本号低的一定先于版本号高的写入,版本号高的如果已经写入那么版本号低的依赖一定满足。如果集群内对该键还进行了复制,那么必须等待Chain Replication完成,当上述过程完成后,本地集群内的状态就达到一致,可以响应客户端。

随后主节点将异步地将该put请求发送到其它集群中的等效节点,这些等效节点执行相同的操作,但在执行之前它必须等待所有依赖都已写入到本集群中的服务器,此时它通过dep_check接口向本集群中的其它服务器询问相关的依赖项是否满足,阻塞等待所有服务器都满足依赖后才执行操作。

对于get操作也是类似的,客户端库将调用get_by_version接口向本地集群发送请求,该请求包括数据键及其版本号,用户可以查询最新的版本,也可以查询指定的版本号,如果指定的版本还未写入,主节点必须阻塞等待。

注意到客户端库中还有一个get_trans接口,该接口在默认的COPS版本是不提供的,仅仅在COPS-GT版本提供。get_trans接口可以同时查询多个键,并且COPS-GT保证所有键的查询结果都是一致的,通过COPS的get接口是无法保证这一点的,这是因为连续的get请求之间数据可能已经部分变化,这样两次get请求得到的数据就不属于同个一致的状态了。COPS-GT版本为了实现get_trans接口,必须保留数据的旧版本,同时对于每个请求还要附带数据的完整依赖而不是最小依赖,大致的原理是乐观并发控制+重试,这里不再赘述。

COPS实现了写请求无需等待跨集群复制,这是有代价的:一旦发生宕机那么本地集群中还未复制到其它集群的操作将丢失,并且由于COPS还需要不断地回收上下文中不会再用到的依赖项,发生宕机可能导致大量的依赖项无法被回收,因为宕机的服务器还需要这些依赖项,这种情况下只能等到宕机恢复。


一致性模型

Read Your Writes(读自写入)

确保同一个Client写入后立即读取相同的数据项,可以观察到自己的写入。

注意:属于会话一致性,即只在同个Client中保证,不能保证Client读取相同数据项时立即观察到另一个Client的写入。

Monotonic Writes(单调写)

如果Client对同个数据项先执行写入W1再执行写入W2,那么确保每一个Client先观察到W1再观察到W2,即同个Client不可能在观察到W2后观察到W1。

Monotonic Reads(单调读)

如果Client对同个数据项执行读取R1,那么此后对其的读取操作R2要么返回和R1相同的数据,要么返回比R1更新的数据,即同个Client不可能观察到已执行的读操作以前的状态。

Writes Follow Reads(读后写入)

如果Client对同个数据项执行写入W1,然后执行读取R1观察到W1,再执行写入W2,那么W2必须在和R1相同的状态或者比R1更新的状态上执行,即W1必须先于W2可见,换句话说:一旦你读取到某些东西,你就无法改变它的过去。

上述介绍的一致性都属于会话一致性,其保证的范围都是在单个Client中,对于不同的Client的执行顺序没有限制。


PRAM Consistency (Pipeline Random Access Memory,也称FIFO Consistency)

等同于Read Your Writes + Monotonic Reads + Monotonic Writes,但不满足Writes Follow Reads。

PRAM保证每个Client观察到(写)操作的顺序:如果这些(写)操作属于同个Client发起,那么观察到它们的顺序将与其发起的顺序一致,但不同Client发起的不同(写)操作允许以任何顺序被观察。由于读操作对其它Client没有影响,因此上述的操作均可特指写操作。

Causal Consistency(因果一致性)

所有Client将以相同的顺序观察到具有因果关系的写入操作。例如,Client执行的读取R1观察到了写入W1的值,那么R1将依赖于W1或称W1是R1的原因,那么所有Client都必须在观察到W1之前观察到R1。对于没有因果关系的写入操作,不保证其顺序。

不能保证读取操作能读取到最新的值。

Sequential Consistency(顺序一致性)

the result of any execution is the same as if the (read and write) operations of all processes on the data store were executed in some sequential order, and the operations of each individual processor appear in this sequence in the order specified by its program.

对每一个Client,这些操作的执行顺序就好像所有操作都以某种串行顺序执行一样,并且操作的执行顺序和这些操作的发起顺序相同。但对同个操作在不同的Client的执行顺序是没有要求的,对多个Client的操作顺序不能作出任何同步保证。

不能保证读取操作能读取到最新的值。

Linearizability(线性一致性)

线性一致性是个很强的一致性模型。在单线程下,对于一连串读写操作来说,尽管操作调用(Invocation)响应(Response)之间有一段时间,但由于操作之间互不重叠,因此从外部看来这些操作似乎是原子的,约定用V代表调用,S代表响应,如下表示单线程情况下三个操作的执行时间线:

V1-----S1          V2-----S2          V3-----S3

这种调度的正确性显然,而且易于推理,线性一致性的目标就是在多线程(Client)下也实现这样的调度。如:

ClientA:AV1---------------AS1                                                         AV2---------------AS2
ClientB:                BV1---------------BS1                          BV2---------------BS2

在上述操作序列中,A和B分别执行两个操作,并且第一个操作和第二个操作分别重叠,线性一致性要保证A和B的操作执行顺序(也称History,历史)和单线程下具有相同的良好性质,如果一个历史满足下列条件,那么称其是线性的(Linearizable)

①对于同个Client的操作,其顺序和发起顺序是一致的。

②如果操作OP2在操作OP1完成(响应)后启动(调用),那么OP1必须先于OP2,即OP2必须观察到OP1。

③如果操作OP1和操作OP2是重叠的,那么不保证顺序的先后,但所有Client都将观察到一致的顺序。

在上述的例子中,历史必须满足以下条件,其中~代表先于:

由因果律:AV1~AS1,AV2~AS2,BV1~BS1,BV2~BS2

由①:AS1~AV2,BS1~BV2

由②:AS1~BV2,BS1~AV2

对这个例子,满足以上约束的所有历史如下,这些历史都是线性的:

所有这些历史在外界看来,其中的每个操作似乎都是顺序执行的、原子的,和单线程情况类似的,如

AV1-----BV1-----AS1-----BS1-----AV2-----AS2-----BV2-----BS2 似乎 A1-B1-A2-B2 或 B1-A1-A2-B2

BV1-----BS1-----AV1-----AS1-----AV2-----BV2-----AS2-----BS2 似乎 B1-A1-A2-B2 或 B1-A1-B2-A2

由于重叠的原因,一个历史可能似乎多个单线程执行顺序。现在,只要实际的调度可以在满足①②③的条件下交换顺序并得到至少一个线性的历史,那么这个调度就是线性一致的。在实际开发中,枚举所有的历史是不切实际的,往往通过在并发中设置一个线性化点(Linearization Point)来实现,所有的操作都将在线性化点中被串行,并在该点强迫线性一致性。

在线性一致性下,读取操作将得到最新写入的值。

Strong Consistency(强一致性)

强一致性在不同的上下文中可能附加不同的含义,但其核心是统一的,即分布式系统中任何时刻对任何节点的访问都将返回相同的状态,并且任何读取都将获得最新写入的值,PRAM、因果一致性、顺序一致性严格来说都不属于强一致性,因为节点的状态无法同步或者同步总是需要延迟,而线性一致性是强一致性的一种,后面介绍的可串行化也是强一致性的一种。

Weak Consistency(弱一致性)

和强一致性相反,任何时刻对任何节点的访问都可能返回不一致的状态。

Eventual Consistency(最终一致性)

属于弱一致性的一种,任何时刻对任何节点的访问都可能返回不一致的状态,但最终这些节点都将统一到一致的状态。

上述介绍的一致性不仅在单个Client的会话中保证顺序,而且对Client之间的顺序也加以不同强度的限制,但这些一致性的控制粒度都是单个操作,接下来介绍的一致性属于事务一致性(或称隔离级别),它们控制事务之间的执行顺序,其中每个事务包含一系列的操作。


在事务一致性中,事务间的某些操作顺序会引发冲突,不同的事务一致性将不同程度地限制这些顺序的发生,包括如下几种类型:

Dirty Write(脏写):W1X ... ... W2X,即一个事务的写操作覆盖另一个事务所写入但未提交的值。如果允许脏写,事务可能无法读取到自己的写入。

Dirty Read(脏读):W1X ... ... R2X,即一个事务可以读取到另一个事务所写入但未提交的值。如果允许脏读,可能发生Aborted Read或Intermediate Read问题。

Aborted Read(读将回滚):W1X ... ... R2X ... ... Rollback1,此时R2X将读取到本应回滚的值,所有读将回滚问题一定发生脏读,但脏读不一定引发读将回滚问题。

Intermediate Read(读中间值):W1X ... ... R2X ... ... W1X ... ... Commit2,此时R2X将读取到其它事务所写入的值,但该值并不是那个事务最后写入该对象的值。所有读中间值问题一定发生脏读,但脏读不一定引发读中间值问题。

Fuzzy Read(读后被写):R1X ... ... W2X,即一个事务所读取的对象在事务未提交前被另一个事务所写入。如果允许读后被写,事务的连续读取将无法得到相同的值,而且可能会引发Lost Update或Unrepeatable Read问题。

Unrepeatable Read(不可重复读):R1X ... ... W2X ... ... R1X,此时事务1的连续两次R1X将读取到不同的值。

Lost Update(更新丢失):R1X ... ... R2X ... ... W2X ... ... W1X,其中W2X和W1X不是盲写,此时W2X将丢失,所有更新丢失问题一定发生读后被写,但读后被写不一定引发更新丢失问题。

Read Partially Committed(读部分提交):W1X ... ... R2X ... ... R2Y ... ... W1Y ... ... Commit1,此时发生了脏读和读后被写,事务2只能读取到事务1提交的部分数据。或 W1X ... ... R2Y ... ... W1Y ... ... Commit1 ... ... R2X,此时发生了读后被写(但没有发生脏读),事务2也只能读取到事务1提交的部分数据。

Phantom(幻影):R1P … ... W2(y in P),即一个事务所读取的对象集合随后被另一个事务增加、删除或修改其中的成员。

其中... ...表示除事务提交外的其它操作。

Read Uncommitted(读未提交)

读未提交具有不同的解释,主流解释认为它仅禁止Dirty Write。在读未提交的隔离级别下,脏读、读后被写(包括不可重复读、更新丢失、读部分提交)、幻影均可能发生。

Read Committed(读已提交)

读已提交具有不同的解释,主流解释认为它禁止Dirty Write和Dirty Read。在读已提交的隔离级别下,脏写、脏读不可能发生,读后被写(包括不可重复读、更新丢失、读部分提交)、幻影均可能发生。

Cursor Stability(游标稳定)

游标稳定加强了读已提交,它禁止Dirty Write、Dirty Read和Lost Update。在游标稳定的隔离级别下,脏写、脏读、更新丢失不可能发生,不可重复读、读部分提交、幻影均可能发生。

Monotonic Atomic View(单调原子视图)

单调原子视图加强了读已提交,它实现了事务ACID特性中的原子性,如果事务A观察到了另一个事务B的某个写入,那么事务A必须能够观察到事务B的所有写入,即事务A要么观察不到事务B的任何写入,要么观察到事务B的所有写入,这正是原子性。

它禁止Dirty Write、Dirty Read和Read Partially Committed。在单调原子视图的隔离级别下,脏写、脏读、读部分提交不可能发生,不可重复读、更新丢失、幻影均可能发生。

Repeatable Read(可重复读)

可重复读保证游标稳定和单调原子视图,它禁止Dirty Write、Dirty Read和Fuzzy Read。在可重复读的隔离级别下,脏写、脏读、读后被写(包括不可重复读、更新丢失、读部分提交)均不可能发生,幻影可能发生。

Snapshot Isolation(快照隔离)

快照隔离加强了单调原子视图(因而保证读已提交),在外界看来所有事务都在一个独立的、特定的快照上执行,仅当事务提交时将快照应用到真实的数据库,快照应用之后开启的事务将原子地观察到之前事务的所有写入。如果事务A创建快照并写入对象X,在事务A提交之前事务B创建快照并也写入了对象X,此时事务A必须中止。

快照隔离在单调原子视图的基础上,隐含如下性质:

在同个事务中读取操作将观察到该事务最新的写入。

如果事务A对事务B可见,那么事务B的读取操作必须观察到事务A的写入。

事务以相同的顺序对所有节点可见。

如果两个事务写入同个对象,其中一个事务必须对另一个事务可见。

Serializability(可串行化)

可串行化保证可重复读和快照隔离。可串行化保证事务的执行顺序似乎和事务的某个串行执行顺序所产生的效果相同,这一点和线性一致性有些类似,但却有本质的不同,线性一致性强迫了操作的执行顺序,但可串行化仅仅要求事务执行后的效果相同,并不强迫事务的真正执行顺序。

给定一组事务,枚举其所有的串行执行顺序,每个串行执行顺序最后产生的状态都是正确且无冲突的,可串行化保证事务的真正执行顺序必定产生这些状态的其中一个,这是个很强的保证,不过由于可串行化并不强迫事务的真正执行顺序,因此必须保证外界无法观察到和串行执行过程不一致的中间状态。在实践中,实现可串行化的判断是比较困难的,一般是通过可串行化的一个子集来判断,这个子集即冲突可串行化(Conflict Serializability)

在可串行化的隔离级别下,脏写、脏读、读后被写(包括不可重复读、更新丢失、读部分提交)和幻影均不可能发生。

Strict Serializability(严格可串行化)

严格可串行化相当于可串行化和线性一致性,它不仅保证事务的执行顺序似乎和事务的某个串行执行顺序所产生的效果相同,还保证事务实际执行的顺序与该串行顺序相对应,即如果事务A在事务B启动前提交,那么事务A必须先于事务B实际执行。

External Consistency(外部一致性)

外部一致性比严格可串行化更强,它强迫事务的实际执行顺序和提交顺序是一致的,即如果事务A在事务B提交前提交,那么事务A必须先于事务B实际执行。

 

Leave a Reply

Your email address will not be published. Required fields are marked *