0%

lab1-mapReduce

最近开始学习MIT的分布式课程,首先做了lab1。

lab1的内容是mapReduce,关键有两部分,一是理解mapReduce的原理,二是实现分布式的mapReduce。

以下内容也从这两部分着手

MapReduce 的原理

下图是一个word count 程序在mapReduce上的原理图。本图仅仅涉及worker节点,master节点未画出。master节点主要起 给worker节点分配工作的作用。

mapreduce

顾名思义,mapReduce就是将可分割的任务划分成两个阶段完成

  • map阶段将原始数据(如文件内容)拆分成了一个个key-value pair的形式
  • reduce阶段将这些拥有相同key的pair聚集在一起,进行某种用户自定义的汇总运算。

mapReduce 的例子

  • 单词计数:map函数将content拆分成 {word, “1”}的形式;reduce将聚集在一起的相同的word的进行频率计数,输出{word, “1023”}的形式
  • 倒排索引:由单词找文档。map函数将{docName, content} 拆分为 {word, docName}的list;reduce将聚集在一起的相同的word,的对应的docName进行排序并汇总

分布式的MapReduce

1. master

分布式的MapReduce采用C/S的模式,其中master作为服务端,worker节点作为客户端。即分布式MapReduce的运行模式为:worker节点通过RPC不断向master节点请求任务,master节点根据自己的数据结构记录的任务情况来给worker节点分配任务(通过RPC返回任务的meta description)。worker节点获取任务meta信息后,通过分布式文件系统获取文件,并完成任务的计算。

Master的主要数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Master struct {
//待处理的n个文件
files []string

//map任务的meta描述,凭借这个来分配map任务
mapTasks []Task
mapTaskPtr int

//reduce任务的meta描述,凭借这个来分配reduce任务
reduceTasks []Task
reduceTaskPtr int

//当前已完成的task计数,用于判断每个阶段的任务是否完成
mapCompleteCnt int
reduceCompleteCnt int
//mutex互斥保护变量:RPC是并发的,因此需要互斥访问Master的meta data
mutex sync.Mutex
}

描述任务的数据结构如下:

1
2
3
4
5
6
7
8
type Task struct {
//任务类型,分为:Map任务,Reduce任务,IDLE空闲任务
taskType_ int

taskId_ int
//任务状态:待分配、已分配、已完成
taskState_ int
}

Master主要需要实现两个RPC接口供Worker调用:

  • AskForTask:请求任务,该接口会根据任务分配和完成情况,返回一个task。具体来说,当所有map任务完成后,才可以继续分配reduce任务。如果所有map任务都已经分配出去,但并没有全部收到commit,说明部分map任务还在被执行,此时如果有worker请求任务,应该给它分配IDLE任务。
  • CommitTask:提交任务,该接口会将一个已分配任务的状态改为已完成

2. worker

worker作为客户端,不断调用Master提供的AskForTask请求任务,当获得一个任务后,就开始执行,执行完毕后调用CommitTask提交任务。然后循环继续请求新的任务。

worker每次请求任务都是通过RPC,当RPC向Master节点发起连接时,如果连接失败,说明master损坏,or整个mapReduce任务已经完成且master已退出。此时worker也退出即可。

worker执行任务的流程则如文章一开始画的图一样。

3. 错误处理

master故障:只需要将master节点的meta-data信息周期性写入磁盘,当master故障时,重启一个新节点并load这个meta-data即可继续执行master的功能。当然一般来说master是不允许有故障的,如果出现了故障,直接停止整个程序的运行,让用户检查故障也是一种方法

worker故障:如果worker发生了故障,则worker的任务需要由其它worker重新完成

  • master端:master分配一个任务给worker后,会启动一个协程来监控任务的完成情况。具体来说,可以设置一个定时器,当10s后检查该任务是否已经由worker提交,且任务的状态改为“已完成“。如果否,则说明worker可能存在故障,只需要将该任务从 “已分配”的状态改为“待分配”,即可。
  • worker端:worker的任务提交必须是原子性的,即worker在运行中生成的文件是私有的,可以将其命名为temp文件。只有当任务完成后,才将temp文件改成标准命名的文件,使得其它worker节点可见,然后再向master节点commit任务。这样做的原因是,如果worker在执行任务的过程中直接使用标准命名的文件,当work节点挂掉后,其它work节点会重做该task,但此时出现了file name的冲突。

4. go编程的一些note

刚开始尝试go编程,记录一些go的语法

  • RPC

    1
    2
    3
    4
    5
    6
    7
    8
    9
    //1.RPC传递的struct的成员必须大写
    type TaskRequest struct {
    Pad string
    }

    //2.RPC的接口函数格式; 请求消息和返回消息通过函数参数传递,因此需要使用引用传递;返回值必须是error类型,若没有错误则返回nil
    func (m *Master) CommitTask(request *CommitRequest, response *CommitResponse) error{
    return nil
    }
  • fmt 相关

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //1. %v可打印各种变量
    fmt.Printf("open file [%v] failed %v", fileName, err)

    //2. Sprintf 格式化输出字符串
    str := fmt.Sprintf("tmp-mr-%v-%v", mapTaskId, i)

    //3. Sscanf 从字符串格式化读入数据; n表示读到了几个数据
    n, err := fmt.Sscanf(str, "mr-%v-%v", &map_id, &reduce_id)

    //4. Fprintf 向文件中格式化输出数据
    fmt.Fprintf(filePtr, "%v %v\n", key, val)
  • json

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //1. 直接把struct数据写入到文件中,以json格式
    encoder := json.NewEncoder(filePtr)
    err := encoder.Encode(kvaStruct)

    //2. 从文件中把json解码,并读入为struct
    decoder := json.NewDecoder(f)
    for {
    kv := KeyValue{}
    if err := decoder.Decode(&kv); err != nil {
    break
    }
    kva = append(kva, kv)
    }
  • 读写文件相关

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //1.利用ioutil读文件,得到的buf类型为[]byte;readAll读入该文件所有的内容
    buf, err := ioutil.ReadAll(file)

    //2.利用ioutil读取目录,得到的是每个文件的描述struct(不是文件指针,也不是fileName)
    allFilesDescription, err := ioutil.ReadDir("./")
    for _, fileDes := range allFilesDescription {
    fileName := fileDes.Name()
    }

    //3. 利用filePtr.Read只能读取缓冲区大小的数据
    buf := make([]byte, 1024)
    n, err := filePtr.Read(buf) //读到的只是1024 bytes大小的数据,不会读完全文