本文主要是介绍MIT6.824 Lab1 MapReduce 通关攻略,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1. 简介
在这个 Lab 中,我们将建立一个 MapReduce 库,作为使用 GO 编程和构建一个分布式容错系统的初步。在第一个部分,我们将会写一个简单的 MapReduce 程序。在第二个部分,我们将会写一个 Master,去向 workers 分发任务已经 handle workers 的错误。
2. Software
2.1. Go 环境
官方下载 go1.9 源码golang.org
sodu tar -C /usr/local -xzf go1.9.linux-386.tar.gz
Add /usr/local/go/bin
to the PATH environment variable. You can do this by adding this line to your /etc/profile (for a system-wide installation) or $HOME/.profile:
sudo vim ~/.profile
// 此条语句加入到文件中
export PATH=$PATH:/usr/local/go/bin// 保存退出后执行,使修改生效
source $HOME/.profile
2.2. 6.824 Code
$ git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824
$ cd 6.824
$ ls
Makefile src
当前的Map/Reduce
实现支持两种操作模式,顺序和分布式。 在前者中,map和reduce每次执行一个任务:首先,第一个map任务执行完成,然后是第二个,然后是第三个,等等。当所有map任务完成后,第一个reduce任务运行,然后是第二个,等等。这种模式虽然不是很快,但对于调试很有用。 分布式模式运行许多 worker 线程,这些线程首先并行执行映射任务,然后执行 reduce 任务。 这要快得多,但也难以实现和调试。
3. 序言:熟悉源代码
Over the course of the following exercises, you will have to write/modify doMap
, doReduce
, and schedule
yourself. These are located in common_map.go
, common_reduce.go
, and schedule.go
respectively. You will also have to write the map
andreduce
functions in ../main/wc.go
.
4. Part I: Map/Reduce input and output
现有的代码缺少了两个关键部分:划分 map 任务输出的函数以及为 reduce 收集所有输入的函数。这两个任务分别在doMap()
function in common_map.go
, and the doReduce()
function in common_reduce.go
中执行。
-
Go 文件读写,引入
os
包,其包含常用的文件操作。 -
hash.fnv
FNV算法属于非密码学哈希函数,它最初由Glenn Fowler和Kiem-Phong Vo于1991年在IEEE POSIX P1003.2上首先提出,最后由Landon Curt Noll 完善,故该算法以三人姓的首字母命名。
FNV算法目前有三种,分别是FNV-1,FNV-1a和FNV-0,但是FNV-0算法已经被丢弃了。FNV算法的哈希结果有32、64、128、256、512和1024位等长度。
4.1. 实现
common_map.go
文件中的doMap()
函数工作流程
- 打开输入到 map task 文件;
- 以文件大小
make
一个byte slice,再读取文件内容; - 将文件 data 传入需要用户编写的
mapF(filename string, contents string)
函数中,其函数返回 slice KeyValue。 - 根据 job name ,map task and reduce task创建中间文件’
- 根据 kv.key值,将逻辑上相等的 k-v 经过JSON Encoder 编码后写入相应的 file 中, 此处逻辑相等是使用了
hash-fnv-32
过程。
file, err := os.Open(inFile)if err != nil {log.Fatal(err)}defer file.Close()fileInfo, err := file.Stat()if err != nil {log.Fatal(err)}data := make([]byte, fileInfo.Size())_, err = file.Read(data)if err != nil {log.Fatal(err)}// mapkeyV := mapF(inFile, string(data))for i:=0; i<nReduce; i++ {// generate reduce file namerName := reduceName(jobName, mapTask, i)rFile, err := os.Create(rName)if err != nil {log.Fatal(err)}// gracefuldefer rFile.Close()enc := json.NewEncoder(rFile)for _, kv := range keyV {// 将意义上相等的 key 写入同一个文件中if uint32(ihash(kv.Key))%uint32(nReduce) == uint32(i) {err := enc.Encode(&kv)if err != nil {log.Fatal(err)}}}}
common_reduce.go
的实现流程:
- 读入属于自己的中间文件
- 解码出 key-value 值, 此时一个key会对应多个 value,因此 map 使用
map[string][]string
形式。 - 将各个 k-v (每个reduce task 是可能接收多种Key的)值存储到 map 中
- 创建 merge 文件
- 遍历 k-v map,将其传入 reduceF 中进行reduce 操作。最后将其返回值 Encode 进 merge file 中。
// map key映射一个字符串数组 slicekeyValues := make(map[string][]string)for i := 0; i < nMap; i++ {fileName := reduceName(jobName, i, reduceTask)// open reduce相应的 filefile, err := os.Open(fileName)if err != nil {log.Fatal(err)}defer file.Close()// decodedec := json.NewDecoder(file)for {var kv KeyValueerr := dec.Decode(&kv)if err != nil {break;}// key 是否已存在映射_, exist := keyValues[kv.Key]if !exist {keyValues[kv.Key] = make([]string, 0)}keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)}}mergeFileName := mergeName(jobName, reduceTask)mergeFile, err := os.Create(mergeFileName)if err != nil {log.Fatal(err)}defer mergeFile.Close()enc := json.NewEncoder(mergeFile)for k, _ := range keyValues {ret := reduceF(k, keyValues[k])err := enc.Encode(&KeyValue{k, ret})if err != nil {log.Fatal(err)}}
5. Part II: Single-worker word count
接下来我们要实现一个简单的 Map/Reduce 例子——单词计数。我们需要实现main/wc.go
中的mapF()
以及reduceF()
函数。单词判断需要使用到unicode
包。
-
func IsLetter(r rune) bool
,IsLetter reports whether the rune is a letter (category L). -
应该还需要使用到 string 的切分操作,文档中给出提示使用
FieldsFunc
,func FieldsFunc(s string, f func(rune) bool) []string
。其可以按传入的判断函数进行字符串的切分。我们此处刚好可以传入 IsLetter 。不过我们的逻辑是,如果不是letter 才切分,所以再使用时要注意对返回值进行取反一次。
5.1. 实现
func mapF(filename string, contents string) []mapreduce.KeyValue
,词频统计很好理解,我们将 contents 中存在的每个 word 都映射一个 “1” 字符串即可。在 reduce 时对其进行加和就可以了。
func mapF(filename string, contents string) []mapreduce.KeyValue {// Your code here (Part II).keyV := make([]mapreduce.KeyValue, 0)// 切分字符串splitS := strings.FieldsFunc(contents, func(c rune)bool {return !unicode.IsLetter(c)})for _, s := range splitS {keyV = append(keyV, mapreduce.KeyValue{s, "1"})}return keyV
}
func reduceF(key string, values []string) string
,reduce 时, 我们可以直接遍历所有的values,使用strconv
包将字符串转化为整型。实际上,因为values 都是”1“,我们是否可以有函数直接求出values 的长度呢?即有多少个字符串,这样就可以省去遍历过程。实际运行是可以的!
func reduceF(key string, values []string) string {// Your code here (Part II).count := 0for _,v := range values {i, err := strconv.Atoi(v)if err != nil {log.Fatal("reduceF: error", err)}count += i;}return strconv.Itoa(count)
}/* 直接求 values 的长度 */
func reduceF(key string, values []string) string {// Your code here (Part II).count := 0count += len(values)return strconv.Itoa(count)
}
6. Part III: Distributing MapReduce tasks
当前我们只是一次运行一个 map 或 reduce 任务。MapReduce 最大的亮点之一是它可以在不需要开发者做额外的工作前提下,自动地并行普通的顺序化代码。 在lab的这个part,我们需要实现一个 MP 版本,它能为并行在多核环境下的 worker 线程划分任务。
mapreduce/master.go
中的代码做了大部分管理 MP任务的工作。我们的任务是实现schedule
inmapreduce/schedule.go
。The master calls schedule()
twice during a MapReduce job, once for the Map phase, and once for the Reduce phase. 一般来说,任务数会大于 worker 数。schedule()
需要等待所有任务完成才 return。
6.1. 实现
master.go
首先在Distributed
中注册了一个匿名函数传递进了run()
中,然后在run()
中调用了两次schedule
。在执行真正的schedule 之前,forwardRegistrations
将可用的 worker
写入了 channel 中,因此我们只需在 schedule 中读 ch即可。
go mr.run(jobName, files, nreduce,func(phase jobPhase) {ch := make(chan string)go mr.forwardRegistrations(ch)schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)},func() {mr.stats = mr.killWorkers()mr.stopRPCServer()})return
客户端向 Server 发送RPC请求时,需要构造一个 DoTaskArgs 参数。args := DoTaskArgs{jobName, mapFiles[i], phase, i, n_other}
type DoTaskArgs struct {JobName stringFile string // only for map, the input filePhase jobPhase // are we in mapPhase or reducePhase?TaskNumber int // this task's index in the current phase// NumOtherPhase is the total number of tasks in other phase; mappers// need this to compute the number of output bins, and reducers needs// this to know how many input files to collect.NumOtherPhase int
}
根据几点提示,需要了解 RPC、Channel、WaitGroup的基本操作方法。不过我们等待所有task 都完成,同样可以使用 ntasks 个 chan 实现。以下是最终的实现代码
// 等待所有 goroutines finish 需要用 sync.WaitGroup 实现var waitGp sync.WaitGroupwaitGp.Add(ntasks)for i:=0; i<ntasks; i++ {// client dials the server, but we don't konw the server's address and port!// common_rpc 已经写好了服务器连接函数 call !args := DoTaskArgs{jobName, mapFiles[i], phase, i, n_other}go func(args DoTaskArgs) {for {wrk := <- registerChan// send a Worker.Dotask to the Worker,ok := call(wrk, "Worker.DoTask", &args, nil)if ok {// 这样的 Done 貌似不能保证所有任务都完成,只能说任务开始。waitGp.Done()// 再写回利用registerChan <- wrkbreak}}}(args)}waitGp.Wait()fmt.Printf("Schedule: %v done\n", phase)
6.1.1. 注意点
- 在 goroutines 中 使用函数之外的var时需谨慎 !比如使用 变量 i 时,因为他们共享变量,当 goroutine 执行时,i已经等于20了,所以会导致数组索引超过范围。所以需要将我们构造好的参数传入 goroutine 中。
7. Part IV: Handling worker failures
在这个Part,我们要让 master 能 handle workers 的错误。MapReduce 是这相对简单了,因为 workers 没有一个持久的状态(persistent state)。如果工作程序在从主服务器处理RPC时失败,则主服务器的call()最终会因超时而返回false。在这种情况下, master 应该重新把这个任务分配给另一个 worker 。
7.1. 实现
实际上,我们在上一个 Part 就已经考虑到了 call 可能会执行失败的情况。所有我们通过一个 for 无限循环来保证任务的成功执行,只有当 call return ok 时,goroutine 才会 break。
8. Part V: Inverted index generation
反向索引,与我们经常接触的数组索引相反,数组索引通过数组下标也就是索引查元素,而反向索引可以通俗理解为通过字符查此字符的位置。从广义上讲,反向索引是从有关基础数据的有趣事实到该数据的原始位置的映射。
8.1. 实现
mapF
实现与之前的词频统计任务很类似,提取 contents 中的单词,然后建立一个{word, filename}的map 项(区别仅仅在这里)。
func mapF(document string, value string) (res []mapreduce.KeyValue) {// Your code here (Part V).splitS := strings.FieldsFunc(value, func(c rune) bool {return !unicode.IsLetter(c)})for _, v := range splitS {res = append(res, mapreduce.KeyValue{v, document})}return res
}
reduceF
,首先得到单词出现的次数,直接使用 len 求 values 的长度即可。然后遍历 values,分别加入到 返回值 string 中即可。不过格式化输出需要注意以下几点
- 最后一个文件名后没有逗号。
- 单词可能在一个文件中出现多次,最后 reduce 字符串添加时不要重复加入。
- 单词计数是指出现文件数目,不是单词在所有文件中出现的总数。
这样的话,我们也可以相应地修改 map,检查键值对是否存在而选择是否 append。咦,Go 的 map 可以允许插入相同的键值对?检查元素是否存在貌似比较麻烦。而且 map 逻辑是否应该做这种精简也是个问题。 不过这样确实会很大程度上增加中间文件的大小。在输出的中间文件中确实产生了相同键值对,我傻了, res是个KeyValue的 slice。{"Key":"it","Value":"pg-being_ernest.txt"} {"Key":"it","Value":"pg-being_ernest.txt"}
难道是因为 append 的缘故?
func reduceF(key string, values []string) string {// Your code here (Part V).var retTem stringbookNum := 0for _, v := range values {if !strings.Contains(retTem, v) {bookNum ++retTem += v +","}}// 为了代码短,不择手段。尾部处理return strconv.Itoa(bookNum) + " " + strings.TrimRight(retTem, ",")}
9. 最终结果
虚拟机跑程序真的特别慢, 官方文档中显示跑了两秒的程序,我这边跑了一分钟多!
==> Part I
ok mapreduce 62.651s==> Part II
Passed test==> Part III
ok mapreduce 81.767s==> Part IV
ok mapreduce 92.029s==> Part V (inverted index)
Passed test
10. 总结
此Lab的完成主要在建立对Map/Reduce
的理解上,以单词计数的例子可以很好的理解这个过程。master负责任务的调度、派发以及 handle workers的错误,执行doMap()
的worker将文件读入,用户编写的mapF()
进行具体的map逻辑(例如单词计数,将每个单词创建一个K-V键值对{“hello”, “1”}存入K-V slice中),doMap接收返回值后分类创建中间文件(以mapTask+reduceTask命名)并写入。
reduceTask 根据中间文件名读取对应文件(文件中含有多种单词的多个键值对),reduceTask对其分类形成map[string][]string
的形式,交由用户编写的reduceF
进行具体逻辑操作(字频统计则是对每个相同的key对应的 value 进行加和),最后生成类似 {"hello", "5"}
的k-v存入 merge 文件中。
要实现并行的MapReduce任务,就需要master对任务进行调度。这涉及到master与worker之间的RPC,利用Channel实现空闲worker的调度,以及使用WaitGroup
实现等待所有mapTask执行完毕后再执行reduceTask的调度。
实际上,本Lab的所有文件读写都只是在单机上完成,因为此时还未实现分布式文件系统。
这篇关于MIT6.824 Lab1 MapReduce 通关攻略的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!