0%

lab2-raft

最近完成了6.824的lab2,记录一下笔记。

lab2的内容是实现raft协议。整个实验的内容分为3部分,lab2-A实现raft的选举机制,lab2-B实现raft的日志追加机制,lab2-C实现raft的持久化。

完成实验的过程整体上可以分为两方面,其一是理解raft算法原理和细节,其二是设计和具体实现。

以下内容也从这两方面分别介绍

raft 算法原理和细节

为了对大规模数据进行处理,单机就成为了瓶颈,解决的办法就是使用更多的机器协同工作。而由于计算机本身的物理特性,其存在一定的崩溃概率,另外计算机网络也是计算机集群工作时出现故障的潜在原因(网络分隔、数据包丢失、数据包未按序到达)。如何使得由网络连接的计算机之间能协调一致工作,在某个机器故障or网络故障后,整个系统依然能正常运转,这就是一致性算法的目标。一致性算法用来组织机器执行指令,并使得其最终状态是一致的,在这个过程中能容许部分节点的故障or失败。

raft是一种管理复制日志的一致性算法,它主要分为三个部分,选举、日志复制、安全性。raft算法会从整个集群中选举出一个leader,其它节点称为follower。每次客户端的command会先记录在leader的日志中,然后由leader负责同步给follower。当一条日志同步数目过半后,leader就可以commit这条日志,并把它应用到自己的状态机。说白了采用raft算法的系统中是采用投票机制来达成共识的,因此3个节点的系统中,只要2个节点一致了,整个系统就算做达成了共识。所以3个节点的系统可以容忍1个节点故障,5个节点的系统可以容忍2个节点的故障。

下图是raft原论文中的,理解了这张图,外加一些选举安全性的说明,实现raft算法就基本没问题了。

raft0

选举

raft使用心跳机制来实现leader的感知。即leader周期性的广播心跳信息给follower,follower由此得知leader的存在。如果follower在一段时间内没有收到心跳数据(可能是leader和它之间的网络中断了,也可能是leader故障了),则它会认为leader已经死了,因此自己会变成candidate,并发起投票。如果它可以得到半数的票,就可以当选为新的leader。

follower变成canditate之后立即发起投票,这包含如下几个动作:term加一,自己投自己,重置自己的选举计时器,然后群发自己的请求投票RPC。

接收者收到candidate的投票请求后,先检查term。term相当于一个逻辑时钟,标志着现在发起投票的人处于哪一时代。如果term比自己的老,则可以直接拒绝。否则,结合日志的term、index(candidate的日志必须和自己的一样新or比自己的还新),voteFor(未投过票就投给它,投过它就直接返回给它true,已经投给别人了就返回false)来判断是否投票给它。

当candidate变成leader以后,他需要立即群发心跳信息,以阻止其它人继续选举。相应地,如果一个节点收到了term比自己新的心跳信息,它就知道已经有leader了,自己就需要老老实实做follower。

为避免大家同时开始发起选举,然后都自己给自己投票,陷入僵局,可以把每个人的选举计时器都设置为随机时间,比如在300ms到400ms之间浮动。

对于任何接收者来说,如果接收到的RPC消息中的term比自己的term大,自己就需要转变为follower

另外,voteFor什么时候重置为空呢?由voteFor的定义可以知道,它是代表本节点在当前term投票给了谁,因此代码中任何发生Term变动的地方,都需要重置voteFor。

复制日志

上层应用把cmd发送给leader,然后leader把它写入自己的日志,然后把自己的日志传送给集群中的大多数节点。由于由于心跳和日志追加都是leader发送给其他人的,因此两者可以合并起来,当追加日志的RPC中log为空,就表示这是心跳信息,否则这既是心跳信息,也是日志追加信息。具体实施就是:每次该发送心跳的时候,如果有需要传送的日志,就带上,没有就只发送空消息。

复制日志的大体流程是:上层应用调用leader的start,把cmd传送给它。leader把cmd添加到自己的log中,在下一次心跳计时器到期后,发送给follower,follower会回复是否成功复制到自己的log,如果成功,则leader在自己的matchIndex数组记录一下。根据matchIndex,如果有半数以上的节点都复制了这条消息,leader就把它commit掉。在下一次心跳时,leader会告诉每个follower自己的commit的进度,然后follower会跟随leader也commit。

会有一个后台线程,不断把commit的cmd应用到状态机,一旦一条日志应用到了状态机,它就可以被返回给上层应用。

由于集群中任何时刻都可能有节点故障,因此存在follower的日志和leader的日志不一致的情况。根据raft的思想,以leader为主,冲突的follower的日志需要服从leader的日志(这是因为leader是得到大多数选票而选出来的,这也意味着它的日志比大多数人的都要新or一样新),服从leader也就是少数服从多数。

处理冲突需要leader和follower协商,一直找到双方都一致的那个点,然后重传从那个点以后的所有日志。协商机制可以通过leader的nextIndex数组实现。nextIndex数组在leader刚选出来的时候初始化为leader的日志末尾的下一个index,然后leader开始发送log后,如果收到follower的回复为false,表示这些日志没有匹配上,因此leader开始递减nextIndex,发送新的从nextIndex到末尾的日志过去,这样反复直到匹配成功。匹配的原理是,leader的某个index上的日志,必须和其它节点在该index上的日志一致。而我们不能靠检查日志内容来断定是否一致,而是通过Term来判断。即不同节点上的确定的index上的日志的term相等,则他们一致。

matchIndex表示双方截至目前匹配成功的index,在leader刚选举出来的时候初始化为0,表示leader不知道当前的匹配情况,然后可以随着leader和follower协商处理冲突日志的成功,而得知当前的匹配进度,从而更新matchIndex。matchIndex表示了leader和集群内所有节点的一致性匹配情况,因此可以用于提交log。当leader检查matchIndex,发现某一条日志被复制到了大多数节点后,就可以更新commitIndex,实现提交。当然,根据论文中的fig8,提交还有一个小细节,即leader更新commitIndex的这一条日志的Term必须是当前Term,不能是旧的Term,这一点随后解释

安全性

  1. 选举安全:领导人的log中必须包括所有已提交的条目,换句话说,只有包含所有已提交的条目的节点才能被选为领导人。

    实现这一点,是通过投票限制实现的:follower节点只会给日志比自己新or和自己一样新的人投票。这里的新的定义是:看最后一条日志,如果其Term更大,那就更新。如果Term相等,就看日志Index的大小或者说日志的长度,长度更长的就更新。

    因此为了方便比较,candidate会在请求投票的RPC中携带最后一条日志的term和index,方便follower比较。

  2. Leader不允许提交之前Term的日志条目,只能提交当前term的日志条目。

    raft1

    如上图所示,s1在term1时当选为leader,复制了2条消息,然后故障崩溃。s5在term2时当选为leader(凭借s3 s4和自己的票),然后复制了1条消息,后故障崩溃。然后s1在term3时当选为leader,把黄色的消息给s3复制了一份,此时黄色消息已经在集群中复制过半了。如果允许s1作为leader提交这条黄色的消息,则存在可能,s1将消息应用到了自己的状态机,而s2和s3还没来得及commit or 应用黄色消息,就被图(d)中新当选的leader s5覆盖成了蓝色消息,随后提交蓝色消息并apply。此时,系统中出现了两种状态,在index=2的位置,s1为黄色消息,s2位蓝色消息。

    为了应对这种不一致现象,规定,图(c)时刻,s1作为leader只能通过提交自己term的红色消息的方式间接提交旧term的消息,即s1观察到红色消息复制到了大多数,然后推进commitIndex为红色消息的index,从而间接把之前term的黄色消息也提交了。而不能直接通过观察到黄色消息已经过半,而推进commitIndex为黄色消息的index,因为这有可能会导致不一致状态

lab2实现

代码实现

系统中主要有三个线程负责系统的功能,分别是:heartBeat线程,负责定期发送心跳信息or追加日志;election线程,当选举计时器超时后,负责发起选举;apply线程,负责在后台定期监控当前是否有可以apply的日志(当applyIdx < commitIdx时表示有可以应用的日志)。这三个线程以loop死循环的形式存在,直到上层应用发起kill命令后才退出。

loop内部需要周期性执行handle函数,采用了go内置的定时器,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (rf *Raft) electionMonitorLoop() {
for {
select {
case <-rf.stopCh:
return
case <-rf.electionTimer.C:
rf.electionHandle()
}
}
}

func (rf *Raft) heartBeatSendLoop() {
for {
select {
case <-rf.stopCh:
return
case <-rf.heartBeatTimer.C:
rf.heartBeatHandle()
}
}
}

func (rf *Raft) applyLoop() {
for {
select {
case <-rf.stopCh:
return
case <-rf.applyTimer.C:
rf.applyHandle()
}
}
}

当计时器到期后,进入对应的handle函数,并在函数内部先重置自己的计时器,然后处理相应的事件。

关于timer的重置,有一些坑,下面的版本是基本正确的版本。坑主要在:当正在执行某个handle时,有可能新的定时的时间又到期了,这时底层会把信号写入到timer.C的channel中,此时直接重置timer是没用的,因为信号已经写入channel了,下个循环依旧会执行handle函数,想要下个循环不执行这个handle函数,需要把这个信号从channel消耗掉。

但是直接消耗也不对,因为stop函数返回false有两种可能,其一是timer到期了,信号写入channel,但没有被消耗;其二是timer到期了,信号写入channel,已经被消耗了。如果看到stop为false,就直接去消耗channel,就可能会阻塞在这里。因此可以采用select的方式,如果当前channel有数据,就消耗,如果没有,就走default分支。

1
2
3
4
5
6
7
8
9
10
func (rf *Raft) resetHeartBeatTimer() {
if !rf.heartBeatTimer.Stop() {
select {
case <-rf.heartBeatTimer.C:
default:
}
}
rf.heartBeatTimer.Reset(HeartBeatTimeoutMs)
}

一些bug

  1. 选举:candidate发送请求投票的RPC并返回结果、统计票数时,不必等所有的RPC都返回结果了再统计数据,尤其是不能阻塞等所有RPC都返回才统计,因为集群中可能存在网络故障or节点故障,某个RPC可能很久才返回结果or永远都无法返回结果。正确的做法是,统计支持的票数和返回的票数,支持票数过半即判定为成功,返回票数达到一半即判定为失败。

  2. 选举:RPC调用相当耗时,注意在RPC调用前释放锁。

  3. 日志追加:go语言的slice是引用,a := b[1:3],a仅仅是指向b的引用,不是拷贝。因此在准备appendEntry的RPC的参数时,参数里的日志必须显式调用copy,从raft节点上复制日志,不能直接用引用的方式。否则,由于rpc的call之前会释放锁,而rpc参数又引用了raft的log数据,会发生data race

  4. 日志追加:follower节点根据leader的log数据覆盖自己的log的时候,需要考虑到多个RPC数据的先后顺序不一致的问题。比如,leader先发送[1,2],后发送[1,2,3,4]。但由于网络拥塞,长日志先到,短日志后到,如果在follower端没有处理好如何覆盖的问题,有可能会导致短日志覆盖了长日志。

    也不能仅仅覆盖leader发来的日志数据。假如follower中数据为[1,2,3,4,a,b,c,d],从1开始冲突,而leader发送的数据为[101,102,103,104],如果仅仅覆盖的话,follower会变成[101,102,103,104, a,b,c,d],而我们期待的是[101,102,103,104]。

    因此正确的做法是:拿到leader的log数据后,先按顺序匹配,直到匹配到不一致的地方,表示从此处到结尾的数据都是冲突的,删除从此处一直到结尾的数据,再把leader log匹配剩下的数据追加到follower

  5. 日志追加:需要考虑到网络延迟、数据包未按序到达的情况。日志追加时,leader收到follower的reply的顺序不一定是按序的。比如先发送[1:5]的log数据,许久未收到对应的reply,后leader收到上层通过start发送的消息多了几条,在下次心跳时发送了[1:10]的数据给follower。过了很久,[1:10]的reply先到,leader将这个follower的matchIndex改为10,随后[1:5]的reply后到,leader此时不能把matchIndex再改为5。