MIT6.824 Lab1 MapReduce 通关攻略

2024-01-11 02:18

本文主要是介绍MIT6.824 Lab1 MapReduce 通关攻略,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 简介

在这个 Lab 中,我们将建立一个 MapReduce 库,作为使用 GO 编程和构建一个分布式容错系统的初步。在第一个部分,我们将会写一个简单的 MapReduce 程序。在第二个部分,我们将会写一个 Master,去向 workers 分发任务已经 handle workers 的错误。

2. Software

2.1. Go 环境

官方下载 go1.9 源码golang.org

sodu tar -C /usr/local -xzf go1.9.linux-386.tar.gz

Add /usr/local/go/bin to the PATH environment variable. You can do this by adding this line to your /etc/profile (for a system-wide installation) or $HOME/.profile:

sudo vim ~/.profile
// 此条语句加入到文件中
export PATH=$PATH:/usr/local/go/bin// 保存退出后执行,使修改生效
source $HOME/.profile

2.2. 6.824 Code

$ git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824
$ cd 6.824
$ ls
Makefile src

当前的Map/Reduce实现支持两种操作模式,顺序和分布式。 在前者中,map和reduce每次执行一个任务:首先,第一个map任务执行完成,然后是第二个,然后是第三个,等等。当所有map任务完成后,第一个reduce任务运行,然后是第二个,等等。这种模式虽然不是很快,但对于调试很有用。 分布式模式运行许多 worker 线程,这些线程首先并行执行映射任务,然后执行 reduce 任务。 这要快得多,但也难以实现和调试。

3. 序言:熟悉源代码

Over the course of the following exercises, you will have to write/modify doMap, doReduce, and schedule yourself. These are located in common_map.go, common_reduce.go, and schedule.go respectively. You will also have to write the map andreduce functions in ../main/wc.go.

4. Part I: Map/Reduce input and output

现有的代码缺少了两个关键部分:划分 map 任务输出的函数以及为 reduce 收集所有输入的函数。这两个任务分别在doMap() function in common_map.go, and the doReduce() function in common_reduce.go中执行。

  1. Go 文件读写,引入os包,其包含常用的文件操作。

  2. hash.fnv

FNV算法属于非密码学哈希函数,它最初由Glenn Fowler和Kiem-Phong Vo于1991年在IEEE POSIX P1003.2上首先提出,最后由Landon Curt Noll 完善,故该算法以三人姓的首字母命名。

FNV算法目前有三种,分别是FNV-1,FNV-1a和FNV-0,但是FNV-0算法已经被丢弃了。FNV算法的哈希结果有32、64、128、256、512和1024位等长度。

4.1. 实现

  1. common_map.go文件中的doMap()函数工作流程
  • 打开输入到 map task 文件;
  • 以文件大小make一个byte slice,再读取文件内容;
  • 将文件 data 传入需要用户编写的mapF(filename string, contents string)函数中,其函数返回 slice KeyValue。
  • 根据 job name ,map task and reduce task创建中间文件’
  • 根据 kv.key值,将逻辑上相等的 k-v 经过JSON Encoder 编码后写入相应的 file 中, 此处逻辑相等是使用了hash-fnv-32过程。
     file, err := os.Open(inFile)if err != nil {log.Fatal(err)}defer file.Close()fileInfo, err := file.Stat()if err != nil {log.Fatal(err)}data := make([]byte, fileInfo.Size())_, err = file.Read(data)if err != nil {log.Fatal(err)}// mapkeyV := mapF(inFile, string(data))for i:=0; i<nReduce; i++ {// generate reduce file namerName := reduceName(jobName, mapTask, i)rFile, err := os.Create(rName)if err != nil {log.Fatal(err)}// gracefuldefer rFile.Close()enc := json.NewEncoder(rFile)for _, kv := range keyV {// 将意义上相等的 key 写入同一个文件中if uint32(ihash(kv.Key))%uint32(nReduce) == uint32(i) {err := enc.Encode(&kv)if err != nil {log.Fatal(err)}}}}
  1. common_reduce.go的实现流程:
  • 读入属于自己的中间文件
  • 解码出 key-value 值, 此时一个key会对应多个 value,因此 map 使用map[string][]string形式。
  • 将各个 k-v (每个reduce task 是可能接收多种Key的)值存储到 map 中
  • 创建 merge 文件
  • 遍历 k-v map,将其传入 reduceF 中进行reduce 操作。最后将其返回值 Encode 进 merge file 中。
	// map key映射一个字符串数组 slicekeyValues := make(map[string][]string)for i := 0; i < nMap; i++ {fileName := reduceName(jobName, i, reduceTask)// open reduce相应的 filefile, err := os.Open(fileName)if err != nil {log.Fatal(err)}defer file.Close()// decodedec := json.NewDecoder(file)for {var kv KeyValueerr := dec.Decode(&kv)if err != nil {break;}// key 是否已存在映射_, exist := keyValues[kv.Key]if !exist {keyValues[kv.Key] = make([]string, 0)}keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)}}mergeFileName := mergeName(jobName, reduceTask)mergeFile, err := os.Create(mergeFileName)if err != nil {log.Fatal(err)}defer mergeFile.Close()enc := json.NewEncoder(mergeFile)for k, _ := range keyValues {ret := reduceF(k, keyValues[k])err := enc.Encode(&KeyValue{k, ret})if err != nil {log.Fatal(err)}}

5. Part II: Single-worker word count

接下来我们要实现一个简单的 Map/Reduce 例子——单词计数。我们需要实现main/wc.go中的mapF()以及reduceF()函数。单词判断需要使用到unicode包。

  1. func IsLetter(r rune) bool,IsLetter reports whether the rune is a letter (category L).

  2. 应该还需要使用到 string 的切分操作,文档中给出提示使用FieldsFuncfunc FieldsFunc(s string, f func(rune) bool) []string。其可以按传入的判断函数进行字符串的切分。我们此处刚好可以传入 IsLetter 。不过我们的逻辑是,如果不是letter 才切分,所以再使用时要注意对返回值进行取反一次。

5.1. 实现

  1. func mapF(filename string, contents string) []mapreduce.KeyValue,词频统计很好理解,我们将 contents 中存在的每个 word 都映射一个 “1” 字符串即可。在 reduce 时对其进行加和就可以了。
func mapF(filename string, contents string) []mapreduce.KeyValue {// Your code here (Part II).keyV := make([]mapreduce.KeyValue, 0)// 切分字符串splitS := strings.FieldsFunc(contents, func(c rune)bool {return !unicode.IsLetter(c)})for _, s := range splitS {keyV = append(keyV, mapreduce.KeyValue{s, "1"})}return keyV
}
  1. func reduceF(key string, values []string) string,reduce 时, 我们可以直接遍历所有的values,使用strconv包将字符串转化为整型。实际上,因为values 都是”1“,我们是否可以有函数直接求出values 的长度呢?即有多少个字符串,这样就可以省去遍历过程。实际运行是可以的!
func reduceF(key string, values []string) string {// Your code here (Part II).count := 0for _,v := range values {i, err := strconv.Atoi(v)if err != nil {log.Fatal("reduceF: error", err)}count += i;}return strconv.Itoa(count)
}/* 直接求 values 的长度 */
func reduceF(key string, values []string) string {// Your code here (Part II).count := 0count += len(values)return strconv.Itoa(count)
}

6. Part III: Distributing MapReduce tasks

当前我们只是一次运行一个 map 或 reduce 任务。MapReduce 最大的亮点之一是它可以在不需要开发者做额外的工作前提下,自动地并行普通的顺序化代码。 在lab的这个part,我们需要实现一个 MP 版本,它能为并行在多核环境下的 worker 线程划分任务。

mapreduce/master.go中的代码做了大部分管理 MP任务的工作。我们的任务是实现schedule inmapreduce/schedule.go。The master calls schedule() twice during a MapReduce job, once for the Map phase, and once for the Reduce phase. 一般来说,任务数会大于 worker 数。schedule()需要等待所有任务完成才 return。

6.1. 实现

master.go首先在Distributed中注册了一个匿名函数传递进了run()中,然后在run()中调用了两次schedule。在执行真正的schedule 之前,forwardRegistrations将可用的 worker 写入了 channel 中,因此我们只需在 schedule 中读 ch即可。

go mr.run(jobName, files, nreduce,func(phase jobPhase) {ch := make(chan string)go mr.forwardRegistrations(ch)schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)},func() {mr.stats = mr.killWorkers()mr.stopRPCServer()})return

客户端向 Server 发送RPC请求时,需要构造一个 DoTaskArgs 参数。args := DoTaskArgs{jobName, mapFiles[i], phase, i, n_other}

type DoTaskArgs struct {JobName    stringFile       string   // only for map, the input filePhase      jobPhase // are we in mapPhase or reducePhase?TaskNumber int      // this task's index in the current phase// NumOtherPhase is the total number of tasks in other phase; mappers// need this to compute the number of output bins, and reducers needs// this to know how many input files to collect.NumOtherPhase int
}

根据几点提示,需要了解 RPC、Channel、WaitGroup的基本操作方法。不过我们等待所有task 都完成,同样可以使用 ntasks 个 chan 实现。以下是最终的实现代码

// 等待所有 goroutines finish 需要用 sync.WaitGroup 实现var waitGp sync.WaitGroupwaitGp.Add(ntasks)for i:=0; i<ntasks; i++ {// client dials the server, but we don't konw the server's address and port!// common_rpc 已经写好了服务器连接函数 call !args := DoTaskArgs{jobName, mapFiles[i], phase, i, n_other}go func(args DoTaskArgs) {for {wrk := <- registerChan// send a Worker.Dotask to the Worker,ok := call(wrk, "Worker.DoTask", &args, nil)if ok  {// 这样的 Done 貌似不能保证所有任务都完成,只能说任务开始。waitGp.Done()// 再写回利用registerChan <- wrkbreak}}}(args)}waitGp.Wait()fmt.Printf("Schedule: %v done\n", phase)

6.1.1. 注意点

  1. 在 goroutines 中 使用函数之外的var时需谨慎 !比如使用 变量 i 时,因为他们共享变量,当 goroutine 执行时,i已经等于20了,所以会导致数组索引超过范围。所以需要将我们构造好的参数传入 goroutine 中。

7. Part IV: Handling worker failures

在这个Part,我们要让 master 能 handle workers 的错误。MapReduce 是这相对简单了,因为 workers 没有一个持久的状态(persistent state)。如果工作程序在从主服务器处理RPC时失败,则主服务器的call()最终会因超时而返回false。在这种情况下, master 应该重新把这个任务分配给另一个 worker 。

7.1. 实现

实际上,我们在上一个 Part 就已经考虑到了 call 可能会执行失败的情况。所有我们通过一个 for 无限循环来保证任务的成功执行,只有当 call return ok 时,goroutine 才会 break。

8. Part V: Inverted index generation

反向索引,与我们经常接触的数组索引相反,数组索引通过数组下标也就是索引查元素,而反向索引可以通俗理解为通过字符查此字符的位置。从广义上讲,反向索引是从有关基础数据的有趣事实到该数据的原始位置的映射。

8.1. 实现

  1. mapF实现与之前的词频统计任务很类似,提取 contents 中的单词,然后建立一个{word, filename}的map 项(区别仅仅在这里)。
func mapF(document string, value string) (res []mapreduce.KeyValue) {// Your code here (Part V).splitS := strings.FieldsFunc(value, func(c rune) bool {return !unicode.IsLetter(c)})for _, v := range splitS {res = append(res, mapreduce.KeyValue{v, document})}return res
}
  1. reduceF,首先得到单词出现的次数,直接使用 len 求 values 的长度即可。然后遍历 values,分别加入到 返回值 string 中即可。不过格式化输出需要注意以下几点
  • 最后一个文件名后没有逗号。
  • 单词可能在一个文件中出现多次,最后 reduce 字符串添加时不要重复加入。
  • 单词计数是指出现文件数目,不是单词在所有文件中出现的总数。这样的话,我们也可以相应地修改 map,检查键值对是否存在而选择是否 append。咦,Go 的 map 可以允许插入相同的键值对? 检查元素是否存在貌似比较麻烦。而且 map 逻辑是否应该做这种精简也是个问题。 不过这样确实会很大程度上增加中间文件的大小。 在输出的中间文件中确实产生了相同键值对,{"Key":"it","Value":"pg-being_ernest.txt"} {"Key":"it","Value":"pg-being_ernest.txt"}难道是因为 append 的缘故? 我傻了, res是个KeyValue的 slice。
func reduceF(key string, values []string) string {// Your code here (Part V).var retTem stringbookNum := 0for _, v := range values {if !strings.Contains(retTem, v) {bookNum ++retTem += v +","}}// 为了代码短,不择手段。尾部处理return strconv.Itoa(bookNum) + " " + strings.TrimRight(retTem, ",")}

9. 最终结果

虚拟机跑程序真的特别慢, 官方文档中显示跑了两秒的程序,我这边跑了一分钟多!

==> Part I
ok  	mapreduce	62.651s==> Part II
Passed test==> Part III
ok  	mapreduce	81.767s==> Part IV
ok  	mapreduce	92.029s==> Part V (inverted index)
Passed test

10. 总结

此Lab的完成主要在建立对Map/Reduce的理解上,以单词计数的例子可以很好的理解这个过程。master负责任务的调度、派发以及 handle workers的错误,执行doMap()的worker将文件读入,用户编写的mapF()进行具体的map逻辑(例如单词计数,将每个单词创建一个K-V键值对{“hello”, “1”}存入K-V slice中),doMap接收返回值后分类创建中间文件(以mapTask+reduceTask命名)并写入。

reduceTask 根据中间文件名读取对应文件(文件中含有多种单词的多个键值对),reduceTask对其分类形成map[string][]string的形式,交由用户编写的reduceF进行具体逻辑操作(字频统计则是对每个相同的key对应的 value 进行加和),最后生成类似 {"hello", "5"}的k-v存入 merge 文件中。

要实现并行的MapReduce任务,就需要master对任务进行调度。这涉及到master与worker之间的RPC,利用Channel实现空闲worker的调度,以及使用WaitGroup实现等待所有mapTask执行完毕后再执行reduceTask的调度。

实际上,本Lab的所有文件读写都只是在单机上完成,因为此时还未实现分布式文件系统。

这篇关于MIT6.824 Lab1 MapReduce 通关攻略的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592829

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

电脑桌面文件删除了怎么找回来?别急,快速恢复攻略在此

在日常使用电脑的过程中,我们经常会遇到这样的情况:一不小心,桌面上的某个重要文件被删除了。这时,大多数人可能会感到惊慌失措,不知所措。 其实,不必过于担心,因为有很多方法可以帮助我们找回被删除的桌面文件。下面,就让我们一起来了解一下这些恢复桌面文件的方法吧。 一、使用撤销操作 如果我们刚刚删除了桌面上的文件,并且还没有进行其他操作,那么可以尝试使用撤销操作来恢复文件。在键盘上同时按下“C

企业大模型落地的“最后一公里”攻略

一、大模型落地的行业现状与前景 大模型在多个行业展现出强大的应用潜力。在金融行业,沉淀了大量高质量数据,各金融平台用户数以亿计,交易数据浩如烟海。利用大模型分析处理这些数据,金融机构可以预测用户行为偏好,更高效、准确评估客户风险,实时监测交易和市场波动,及时制定策略。IDC 调研显示,超半数的金融机构计划在 2023 年投资生成式人工智能技术。 在科技领域,商汤人工智能大装置为大模型企业提

xss-labs-master通关教程

一.level1 先来进行一下代码审计 <?php ini_set("display_errors", 0);//关闭错误显示$str = $_GET["name"]; //接受URL来的get形式的name传参echo "<h2 align=center>欢迎用户".$str."</h2>";//在网页输出,并不是echo执行的,而是echo把HTML代码发送到浏览器,浏览器对发送的H

【Hadoop|MapReduce篇】MapReduce概述

1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. MapReduce优缺点 2.1 优点 MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式

2025计算机毕业设计选题攻略

博主介绍:✌全网粉丝10W+,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久,选择我们就是选择放心、选择安心毕业✌ 🍅由于篇幅限制,想要获取完整文章或者源码,或者代做,拉到文章底部即可看到个人VX。🍅 2023年 - 2024年 最新计算机毕业设计 本科 选题大全 汇总 计算机专业毕业设计之避坑指南(开题答辩选导师必看) 感兴趣的可以先收藏起来,还

MapReduce算法 – 反转排序(Order Inversion)

译者注:在刚开始翻译的时候,我将Order Inversion按照字面意思翻译成“反序”或者“倒序”,但是翻译完整篇文章之后,我感觉到,将Order Inversion翻译成反序模式是不恰当的,根据本文的内容,很显然,Inversion并非是将顺序倒排的意思,而是如同Spring的IOC一样,表明的是一种控制权的反转。Spring将对象的实例化责任从业务代码反转给了框架,而在本文的模式中,在map

《论软件设计模式及其应用》通关范文,软考高级系统架构设计师

论文真题 设计模式(Design Pattern)是一套被反复使用的代码设计经验总结,代表了软件开发人员在软件开发过程中面临的一般问题的解决方案和最佳实践。使用设计模式的目的是提高代码的可重用性,让代码更容易被他人理解,并保证代码可靠性。现有的设计模式已经在前人的系统中得以证实并广泛使用,它使代码编写真正实现工程化,将已证实的技术表述成设计模式,也会使新系统开发者更加容易理解其设计思路。根据

圆形缓冲区-MapReduce中的

这篇文章来自一个读者在面试过程中的一个问题,Hadoop在shuffle过程中使用了一个数据结构-环形缓冲区。 环形队列是在实际编程极为有用的数据结构,它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组织简单。能很快知道队列是否满为空。能以很快速度的来存取数据。 因为有简单高效的原因,甚至在硬件都实现了环形队列。 环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用

大数据面试通关手册|Hbase面试题(二)

⭐⭐欢迎关注博客主页:https://blog.csdn.net/u013411339⭐⭐欢迎点赞 👍 收藏 ⭐留言 📝 ,欢迎留言交流!⭐⭐本文由【王知无】原创,首发于 CSDN博客!⭐⭐本文首发CSDN论坛,未经过官方和本人允许,严禁转载! 技术背景 起源于谷歌旧三篇论文中bigtable。 设计目的 为了解决大数据环境中海量结构化数据的实时读写问题。为了弥补hadoop生态中没有