最近开始学习MIT的分布式课程,首先做了lab1。
lab1的内容是mapReduce,关键有两部分,一是理解mapReduce的原理,二是实现分布式的mapReduce。
以下内容也从这两部分着手
MapReduce 的原理
下图是一个word count 程序在mapReduce上的原理图。本图仅仅涉及worker节点,master节点未画出。master节点主要起 给worker节点分配工作的作用。
顾名思义,mapReduce就是将可分割的任务划分成两个阶段完成
- map阶段将原始数据(如文件内容)拆分成了一个个key-value pair的形式
- reduce阶段将这些拥有相同key的pair聚集在一起,进行某种用户自定义的汇总运算。
mapReduce 的例子
- 单词计数:map函数将content拆分成 {word, “1”}的形式;reduce将聚集在一起的相同的word的进行频率计数,输出{word, “1023”}的形式
- 倒排索引:由单词找文档。map函数将{docName, content} 拆分为 {word, docName}的list;reduce将聚集在一起的相同的word,的对应的docName进行排序并汇总
分布式的MapReduce
1. master
分布式的MapReduce采用C/S的模式,其中master作为服务端,worker节点作为客户端。即分布式MapReduce的运行模式为:worker节点通过RPC不断向master节点请求任务,master节点根据自己的数据结构记录的任务情况来给worker节点分配任务(通过RPC返回任务的meta description)。worker节点获取任务meta信息后,通过分布式文件系统获取文件,并完成任务的计算。
Master的主要数据结构如下:
1 | type Master struct { |
描述任务的数据结构如下:
1 | type Task struct { |
Master主要需要实现两个RPC接口供Worker调用:
- AskForTask:请求任务,该接口会根据任务分配和完成情况,返回一个task。具体来说,当所有map任务完成后,才可以继续分配reduce任务。如果所有map任务都已经分配出去,但并没有全部收到commit,说明部分map任务还在被执行,此时如果有worker请求任务,应该给它分配IDLE任务。
- CommitTask:提交任务,该接口会将一个已分配任务的状态改为已完成
2. worker
worker作为客户端,不断调用Master提供的AskForTask请求任务,当获得一个任务后,就开始执行,执行完毕后调用CommitTask提交任务。然后循环继续请求新的任务。
worker每次请求任务都是通过RPC,当RPC向Master节点发起连接时,如果连接失败,说明master损坏,or整个mapReduce任务已经完成且master已退出。此时worker也退出即可。
worker执行任务的流程则如文章一开始画的图一样。
3. 错误处理
master故障:只需要将master节点的meta-data信息周期性写入磁盘,当master故障时,重启一个新节点并load这个meta-data即可继续执行master的功能。当然一般来说master是不允许有故障的,如果出现了故障,直接停止整个程序的运行,让用户检查故障也是一种方法
worker故障:如果worker发生了故障,则worker的任务需要由其它worker重新完成
- master端:master分配一个任务给worker后,会启动一个协程来监控任务的完成情况。具体来说,可以设置一个定时器,当10s后检查该任务是否已经由worker提交,且任务的状态改为“已完成“。如果否,则说明worker可能存在故障,只需要将该任务从 “已分配”的状态改为“待分配”,即可。
- worker端:worker的任务提交必须是原子性的,即worker在运行中生成的文件是私有的,可以将其命名为temp文件。只有当任务完成后,才将temp文件改成标准命名的文件,使得其它worker节点可见,然后再向master节点commit任务。这样做的原因是,如果worker在执行任务的过程中直接使用标准命名的文件,当work节点挂掉后,其它work节点会重做该task,但此时出现了file name的冲突。
4. go编程的一些note
刚开始尝试go编程,记录一些go的语法
RPC
1
2
3
4
5
6
7
8
9//1.RPC传递的struct的成员必须大写
type TaskRequest struct {
Pad string
}
//2.RPC的接口函数格式; 请求消息和返回消息通过函数参数传递,因此需要使用引用传递;返回值必须是error类型,若没有错误则返回nil
func (m *Master) CommitTask(request *CommitRequest, response *CommitResponse) error{
return nil
}fmt 相关
1
2
3
4
5
6
7
8
9
10
11//1. %v可打印各种变量
fmt.Printf("open file [%v] failed %v", fileName, err)
//2. Sprintf 格式化输出字符串
str := fmt.Sprintf("tmp-mr-%v-%v", mapTaskId, i)
//3. Sscanf 从字符串格式化读入数据; n表示读到了几个数据
n, err := fmt.Sscanf(str, "mr-%v-%v", &map_id, &reduce_id)
//4. Fprintf 向文件中格式化输出数据
fmt.Fprintf(filePtr, "%v %v\n", key, val)json
1
2
3
4
5
6
7
8
9
10
11
12
13//1. 直接把struct数据写入到文件中,以json格式
encoder := json.NewEncoder(filePtr)
err := encoder.Encode(kvaStruct)
//2. 从文件中把json解码,并读入为struct
decoder := json.NewDecoder(f)
for {
kv := KeyValue{}
if err := decoder.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}读写文件相关
1
2
3
4
5
6
7
8
9
10
11
12//1.利用ioutil读文件,得到的buf类型为[]byte;readAll读入该文件所有的内容
buf, err := ioutil.ReadAll(file)
//2.利用ioutil读取目录,得到的是每个文件的描述struct(不是文件指针,也不是fileName)
allFilesDescription, err := ioutil.ReadDir("./")
for _, fileDes := range allFilesDescription {
fileName := fileDes.Name()
}
//3. 利用filePtr.Read只能读取缓冲区大小的数据
buf := make([]byte, 1024)
n, err := filePtr.Read(buf) //读到的只是1024 bytes大小的数据,不会读完全文