0%

lab4-Sharded Key/Value Service

MIT分布式原理课程的lab4。最近一直在忙科研上的事情,外加五一假期玩了几天,一直到今天才完成lab4。这篇文章记录lab4的具体情况。lab4的实验分为两个部分:shardmaster和shardKV,这两部分合起来构成一个完整的分布式key-value服务系统。

我们在lab3已经完成过一个基于raft的分布式kv服务了,但lab3的系统使得整个系统的流量都汇集在一台机器上(即集群的leader节点上),这导致了性能问题。在lab3中,所有client的request都是先访问集群的leader,再由leader将request转换为command并借助raft下发到所有replica机器,达成共识后整个集群一起执行这条command,然后再由leader将执行结果返回给client。可以看出来,集群的leader承担了所有流量。

而lab4的任务就是解决上述性能问题。具体来说,我们将所有的kv数据按照key进行划分,比如最简单的方法,按照key的开头字母进行划分,将所有客户端发出的key划分为‘a’到‘z’的26个集合,每个集合称为一个shard。此外我们有若干个基于raft的分布式集群(每个集群都是一个独立的raft系统),每个集群只需要存储和处理这26个shard中的一个子集。这样,每个client都可以根据我们的划分规则,找到key对应的处理的集群,然后向这个集群发起请求,达到了分担负载的目的。

在实现中,我们不能使用固定分配的规则决定哪个集群负责哪些key。原因有两点:一是请求是动态的,这就导致不同时刻不同集群的负载不同,有时候我们需要在集群之间调整任务实现负载均衡,比如把负载过大的集群上的一些shard调整给负载较小的集群,使之能分担一些任务;二是因为,在分布式系统中,我们想实现一个随时允许集群加入和离开的系统,因此我们要给新加入的集群分配一些shard,把要离开的集群的shard分配给其它集群。

因此,我们需要把”shard如何划分“做成动态的,每个集群定期查询最新的”划分方案“,然后调整自己的负载。这个任务划分的方案称为config。我们需要一个raft集群专门负责给其它集群提供config的查询服务,这个集群叫做master,它仅仅负责提供config查询。其它集群负责具体的key-value的处理,称为shardkv集群。因此lab4的实现也分为两个部分,A部分实现shardmaster,B部分实现shardkv。

整个系统的原理如下:

shardKV

A. shardmaster

由上图知道,shardmaster的任务有两个,1)供client查询任意历史config和最新config;2)当client发出移除group或者新增group的指令时,生成一个新的config(要达到负载均衡,即每个移除或新增group后,系统中现存group的任务量要尽量一致)。

一个config如下:

1
2
3
4
5
type Config struct {
Num int // config number
Shards [NShards]int // shard -> gid
Groups map[int][]string // gid -> servers[]
}

Num表示config的编号,通过比较Num就可以知道两个config谁新谁旧。Shard是一个数组,记录着shardId的对应groupId,即记录某个shard分配给哪个group。Group记录每个group内部的所有机器的name。

如何得知一个key的对应shardId是多少呢?lab4中的方案是 shardId = hash(key) mod Nshards。

我们要做的,就是当client发来一个leave group 或者 join group指令后,根据新的group情况重新生成一个config。

join group

当有新的group加入后,我们需要从原有的group上拿出一些shard分配给新的group。原则是:要分配的尽量均匀,并且尽量使shard的移动次数尽量少(因为一个shard就是数据库的一部分数据,移动一次的传输开销很大)。

一个最简单但不可行方案是把所有的N个shard全部按顺序重新分配,比如一共10个shard,原来2个group,新增1个group之后,一共3个group,则group0得到0、1、2号shard,group1得到3、4、5号shard,group2得到6、7、8、9号shard。但这么做违背了上述”使shard的移动次数尽量少“的原则。

一个可行的办法是,可以按照新的group个数计算出每个group平均得到avg个shard。然后遍历每个group,如果有group持有的shard多于avg个,就把多于avg的shard放进一个池子里。然后把池子里的shard按一定的原则逐一分配出去。

这里有一个要注意的地方,就是上述算法是在一个shardmaster集群内的所有raft节点上都执行的,我们必须保证所有节点按照上述算法执行完后,得到相同的结果。因此,我们需要保证算法的一致性。因此在上述算法的基础上,将多于avg的shard放进池子里后,我们分配池子里的shard给group时,为保证一致性,需要一定的原则,这里人为制定一下:

  • 分配池子里的shard时,按照shardId从小到大的顺序,一个一个分配。
  • 具体把这个shard分配给哪个group呢?分配给目前拥有shard个数最少的group。如果有多个group符合,则分配给groupId最小的那个group。

leave group

当现有的group离开后,我们需要把原本它负责的shard分配给其它group。仍然遵循均匀分配和移动shard次数尽量少的原则。

方法是,把离开的group负责的shard放入池子内。然后依次把池子里的每个shard分配给现存的group。我们仍然需要注意一致性原则。具体细节和join group中的一样。

查询config

很简单,没什么可说的。

具体实现

  • client

    client端,和lab3的raftkv一样,遵循线性原则,即:只有当这个请求得到结果后,才会发起下一个请求。考虑到网络环境差可能导致的request的失序、丢失等问题,我们需要给每个client设置一个operationId,初始化为0,每发起一个新的request,就递增operationId,表示上一个request已经得到结果了。在服务端,当收到比当前已经处理过的opeartionId更小的id,说明这是一个失序的request,直接丢弃即可(也可以回复一个ErrOperationOld错误,虽然对端没人接收)。

  • server端

    • 线性原则:主要考虑当重复的或者更旧的operationId到来时,如何处理的问题。

    • snapshot:和lab3一样,也需要做snapshot。shardmaster的状态机包括什么呢?应该至少包括:最后一条被应用到状态机的raft日志的任期lastApplyTerm、最后一条被应用到状态机的raft日志的索引lastApplyIndex、真正的状态机: configs列表、用来实现线性原则的记录每个client最后一条被应用的operation的operationId的lastApplyOperationId

    • 通过负载均衡原则生成新config的算法,上文已经谈到了

bug

  • raft的提交问题。如果我们通过raft.Start()提交的指令是某个request,比如LeaveArgs,请注意一个隐秘的bug。即我们很可能通过raft.Start提交的是LeaveArgs * 类型,在raft leader使用go语言的RPC工具向其它replica节点同步这条指令时,go会自动把LeaveArgs * 指针类型转换成正常的LeaveArgs类型然后再通过RPC发送出去,从而replica节点收到的是LeaveArgs类型的数据。但raft leader自己留下的却是LeaveArgs * 指针类型。

    当applyLoop开始应用数据时需要把interface数据转回LeaveArgs类型, 此时raft leader会转换失败,而其它replica节点会转换成功。导致出现leader无法apply指令,而其它节点都成功apply指令的错误。

实验结果

image-20210516201206118

B. ShardKV

shardKV是kv存储的实现关键。它主要有两个任务,一是正常对外提供key-value的GET和PUT服务,这个大致和lab3的raftkv实现一致,唯一的细节就是server端每次在GET或PUT一个kv pair时,需要先检查这个key是否由自己负责,如果不是,需要返回error,其它的不多赘述;二是每个shardKV集群都需要定期从shardmaster处拉取新的config,并根据这个config调整自己的shard的负责情况,接下来主要记录一下这部分的实现。主要可以分为如下几部分:拉取并更新config、根据config调整自己的shard。

更新config

在一个shardKV集群中,我们需要一个loop定期从shardmaster拉取config。由谁去拉取呢?应该由集群的raft leader 作为一个client,向shardmaster拉取。整个流程为:raft leader 持有一个shardmaster的client对象,并通过它向shardmaster发起Query config的请求。在得到shardmsater的回复并拿到config后,通过leader的raft层把config下发到集群中所有节点,然后所有节点一起执行(apply)这个config

shardmaster中记录了所有的历史config,每次拉取时,应该拉取哪个呢?是最新的config呢还是当前shardkv集群的config的下一个config呢?应该是当前config的下一个config。因为每个shardkv集群的拉取config的进度不一样,因此如果每次都拉取最新的config,很可能会跳过一部分config,导致出现错误。

比如:在config1中,A集群负责shard0,在config2中,B集群负责shard0,在config3中,C集群负责shard0。假设当前ABC集群都处于config1,接着B集群pull config,发现最新的config是config2,它直接跳到config2,接着它发现自己需要获取shard0,从哪里拿呢?它看了一下自己旧的config1,认为shard0在A处,于是它从A处拿到了shard0。过了一会,C集群pull config时,此时最新的config是config3,于是C直接跳到config3,这时C发现自己也需要shard0,它该从哪里拿呢?C看了一下自己的旧的config0,发现shard0在A处,于是C也从A处拿到了shard0。则在此刻,B和C都持有并负责shard0,显然这是错误的。

因此更新config,每次只拿比当前Num大1的config才是正确的。比如上面的场景中,C先过渡到config2,然后再到config3,此时发现自己需要shard0,从哪里获取呢?C查看自己的旧的config2,发现B持有shard0,于是去B索取。如果B开始执行config3了,就会同意C的请求并把自己的shard0给C,如果B还没到config3,就会拒绝这次请求,C请求shard0失败。由此,整个系统中始终只有一个集群持有shard0。

如何apply这个config?集群中所有节点拿到config后,根据config将与自己相关的shard分为3类:原先持有的这次依旧需要持有的shard、原先没有的这次需要持有的shard、原先持有的这次需要抛弃的shard。其中后两种称为needShards和discardShards。

  • 原先持有,这次依旧持有的shard:仍旧放在自己的数据库中,且对外提正常的PUT、GET服务。
  • needShards:需要定时去持有它的shardKV集群索要数据,拿到后放入自己的数据库
  • discardShards:先把它从正常服务的dataBase中移出去,放在一个专门的容器中,等待它的新config下的持有人索要。

注意:只有当所有的needShards都获取到了之后,才视为本次config apply完毕,才允许拉取和apply下一个config。每拿到一个needShard,就立即将它加入自己的database,并立即启动该shard的服务。而discardShards就一直留着,什么时候对方索要成功后,什么时候再删掉。

调整shard

一个shard如下所示,若干个这样的shard就组成了一个shardKV集群的database。

1
2
3
4
5
6
type ShardData struct {
ShardId int
ConfigNum int
Data map[string]string //key-value
RecentApplyOpId map[int64]int64 //clientId-opId
}

在apply完config后,就需要根据这个config来调整shard了。我们只需要对needShards和discardShards进行处理,且关键是needShards。获取needShards有推和拉两种方式,我在这里采用了拉的方式,即需要这个shard的节点主动向持有者请求该shard。

集群内部由谁负责拉取shard?应该由本集群的raft leader负责拉取,当拉取到正确configNum下的shard后,leader通过raft将shard下发给集群内的所有replica节点,然后所有节点一起执行这个shard(将其纳入database,并开启该shard的对外服务)

discardShards的处理?discardShards内存放的是本集群不要的,而其它集群需要的shards。在其它集群通过拉取的方式得到discardShards里的一个shard后,应该通知discardShards的持有集群:可以把这个shard删除掉了,不然长期存着占空间。具体实现方式为:在shard的需求方A集群拉取到一个shard并成功apply到所有节点后,由A集群的leader负责通知对方B可以删除这个shard,然后B收到这个RPC后,由B集群内的leader发起一个删除shard的command,下发到集群内部的所有节点,然后内部所有节点一起执行删除指令。最后由B集群的leader返回给A一个OK表示成功删除。

shardKV的client

官方代码基本上把client端事先实现好了,但由于和以往的流程不太一样,所以还是记录一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (ck *Clerk) Get(key string) string {
args := GetArgs{Key: key, ClientId: ck.clientId, OperationId: ck.operationId}
ck.operationId++

for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
// try each server for the shard.
for si := 0; si < len(servers); si++ {
srv := ck.make_end(servers[si])
var reply GetReply
ok := srv.Call("ShardKV.Get", &args, &reply)
if ok && (reply.Err == OK || reply.Err == ErrNoKey) {
return reply.Value
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
if ok && (reply.Err == ErrWrongLeader) {
continue
}
if ok && (reply.Err == ErrRaftTimeout) {
continue
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for the latest configuration.
ck.config = ck.sm.Query(-1)
}

return ""
}

首先opeartionId必不可少,这是为防止request的失序、丢失。

client的流程变成了:先向shardmaster请求最新的config,根据config查看自己的key应该发往哪个shardkv集群。然后再向该集群定向发送request请求。

challenge exercises

  • Garbage collection of state

    意思是A集群不要的shards,就不需要一直保留了,不然几次config折腾下来,会占存储大量丢弃的shards,占据大量的空间。但如果A集群把它删掉了,那B集群需要这个shard,该怎么办呢?

    办法前文已经给出了:每次apply新的config时,把当前集群的shards分为三类,继续持有的shards、needShards和discardShards。对于本challenge,假设集群A的discardShards中就包含了B需要的shard,那只有当B收到这个shard并成功apply后,由B通知A,A才能从discardShards中把该shard删掉。

  • Client requests during configuration changes

    大意是,在更改config的过程中,由于调整并传输shards是个费时的操作,所以我们希望在传输shards的过程中能尽量对外继续提供GET、PUT等服务。

    1)我们可以在调整shards的过程中,对三类shard中的继续持有shards继续开放服务。这个在上文已经实现。

    2)在拉取多个needShards的时候,拉取成功一个shard,就立即开启这个shard的对外服务。这个也已经实现。

一些bug

  • 依旧是raft.Start()时下发了一个指针类型的数据,导致raft leader 节点 apply的时候转换类型失败的错误。
  • 忘记重置某个timer

实验结果

image-20210516222359550