本文主要是介绍MIT 6.824 MapReduce,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
笑死,这个实验像是在做需求,不过没钱拿QAQ
仅供参考!测试脚本没完全通过,要求比较严格,这里仅仅把lab的map-reduce功能实现了
文章目录
- 知识点总结
- Lab1:MapReduce
- 需要注意的
- 代码
- coordinator
- worker
- rpc
知识点总结
-
简述一下GFS的一致性策略?
元数据的写入是加锁串行的
无并发写一个chunk的时候,当三个副本全部写成功的时候才会返回成功,否则返回失败
这将导致GFS系统中文件的不一致性
有并发的时候由primary决定写的顺序,所有的副本按照这个顺序执行,保证最终一致
注意!串行成功是一致已定义的,并行成功是一致未定义的 -
master恢复
master的历史信息使用快照存储,最近的操作使用日志存储 -
GFS如何进行快照?
首先master取消对当前chunk的租约,保证对chunk的修改通过master进行
之后创建快照 维护对于chunk的引用计数
后续计数>2,则分开独立访问 -
GFS文件系统结构?
GFS组织成树的结构,修改文件需要获得父亲节点的读锁和子节点的读写锁
文件惰性删除,文件重命名为包含时间戳的隐藏文件名,在例行的文件空间扫描中才会删除
这样删除更加可靠,cpu使用更加平衡,为人为的误操作兜底 -
GFS如何保证高可用?
服务快速拉起,master和chunk server都设计成秒级启动
chunk复制策略,保证数据不丢
master节点建设主备,client操作在主master和从master全部落盘后才返回,外部监控进程监控master状态并在master故障后选择新的master升主 -
GFS如何保证数据完整性?
chunk server将chunk切成64KB大小的块,并为每个块维护一个32位的checksum。对读操作,数据返回client之前会检查checksum。对写操作,需要对写范围有覆盖的第一个64KB块和最后一个先进行校验,防止原来存在损坏的数据被本次写隐藏了,然后进行实际写入并重新计算checksum。chunk server空闲时会对所有chunk做整体扫描,尤其针对一个不活动的chunk,防止master认为chunk已经有足够的副本数量了但是实际上副本内容已经损坏。 -
状态备份的两种方式?
状态转移(State transfer) 持续增量同步 Primary 的状态到 Backup,包括CPU、内存、IO设备等等;但是这些状态通常所占带宽很大,尤其是内存变化。
冗余状态机(Replicated State Machine) 将服务器看作是一个具有确定性状态的状态机,只要给定相同初始状态和同样顺序的确定输入,就能保持同样的状态。同步的是外部的事件/操作/输入;同步的内容通常较小,但是依赖主机的一些特性:比如指令执行的确定性(deterministic)。而在物理机上保证确定性很难,但是在 VM 上就简单的多,由于 hypervisor 有对 VM 有完全的控制权,因此可以通过某些手段来额外同步某些不确定性输入(比如类似随机数、系统时钟等)。 -
如何解决复制状态机中primary和backup不一致的问题?
控制输出,保证状态一致后响应成功 -
VM-FT如何解决重复输出的问题?
TCP栈序列号/检验重复的机制 -
VM-FT如何解决split brain?
三方机构Test-and-Set,Test-and-Set服务就是一个仲裁官,决定了两个副本中哪一个应该上线 通过标志位实现 -
raft 如何进行leader选举?
使用随机计时器 -
为什么要有raft?它用来解决什么问题?
raft是一个一致性算法,
复制状态机(replicated state machines)用于对有单个集群leader的大型系统进行leader选举和配置信息的存储。复制状态机通常使用复制日志实现,一致性算法的任务是保证复制日志的一致性。 -
共识算法的特性有哪些?
- 在网络延迟、分区和包丢失、复制和重排序条件下保证安全性(永远返回正确的结果)
- 只要大多数服务器可操作、可通信那么就是完全可用的
- 不依赖时间来保证一致性
- 少数速度较慢的服务器不影响整体系统性能
Lab1:MapReduce
需要注意的
- 中间文件名
mr-X-Y
,我的实现是每次worker
发送map rpc
请求的时候发送一个文件名作为X Done()
退出,多打日志看看中间有没有data race
,我的实现逻辑是当mapArray(map任务数组)
mapMap(map任务crash-recovery标记)
reduceArray(reduce任务数组)
reduceMap(reduce任务crash-recovery标记)
长度全为0时退出- 排序,我的实现是
reduce
读取根据nReduce
分组的文件,这个文件用map
保存在Coordinator struct
中 RPC-Server
没有在Client
注册回调接口,所以crash
的时候直接把任务放回任务数组就好- 因为
Golang
不太熟,所以踩了几个坑:RPC
的结构名需要首字母大写以便正常序列化/反序列化,GoLand
参数有时候会出问题,直接命令行启动最好; data race
我通过多加了几个mutex
解决,可以优化一下减小粒度bash
脚本中设置LC_COLLATE=C
保证sort
命令按照大写优先排序- 测试脚本没完全通过,仅供参考(脚本要求比较严格,这里仅仅把lab功能实现了)
代码
coordinator
package mrimport ("log""sync""time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"type Coordinator struct {// Your definitions here.midFileLock sync.MutexnReduce intmidFile map[int][]stringmapTaskLock sync.MutexmapTaskMapLock sync.MutexmapTaskNum intmapTask []stringmapTaskMap map[int]boolreduceTaskLock sync.MutexreduceTaskMapLock sync.MutexreduceTaskNum intreduceTask []intreduceTaskMap map[int]bool
}// Example Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {reply.Y = args.X + 1return nil
}
func (c *Coordinator) WorkerStatus(args *WorkerStatusArgs,reply *WorkerStatusReply) error{c.mapTaskLock.Lock()c.mapTaskMapLock.Lock()c.reduceTaskLock.Lock()c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()defer c.reduceTaskLock.Unlock()defer c.mapTaskMapLock.Unlock()defer c.mapTaskLock.Unlock()if len(c.mapTask)>0{reply.Status = 0}else if len(c.mapTaskMap)==0{//begin reduceif len(c.reduceTask)==0{if len(c.reduceTaskMap)==0{reply.Status = 3}reply.Status = 2}else {reply.Status = 1}}else{//wait for all map task overreply.Status = 2}log.Printf("get worker status,status is %d,len maptask %d,len maptaskmap %d,len reducetask %d len reducetask map %d",reply.Status,len(c.mapTask),len(c.mapTaskMap),len(c.reduceTask),len(c.reduceTaskMap))return nil
}
func (c *Coordinator) MapRequest(args *MapCallArgs, reply *MapCallReply) error {c.mapTaskLock.Lock()var file stringif len(c.mapTask)>0{file = c.mapTask[0]c.mapTask = c.mapTask[1:len(c.mapTask)]}c.mapTaskLock.Unlock()if file!="" {c.mapTaskMapLock.Lock()defer c.mapTaskMapLock.Unlock()c.mapTaskNum++reply.MapTaskNum = c.mapTaskNumc.mapTaskMap[c.mapTaskNum] = truereply.Value = filereply.NReduce = c.nReducereply.Err = ""log.Printf("%d task be sent,filename is %s", reply.MapTaskNum,reply.Value)}else{reply.Value = ""reply.Err = "Empty queue"return nil}go func(){time.Sleep(10*time.Second)c.mapTaskMapLock.Lock()c.mapTaskLock.Lock()defer c.mapTaskLock.Unlock()defer c.mapTaskMapLock.Unlock()if _,ok :=c.mapTaskMap[reply.MapTaskNum];ok{log.Printf("task %d fail,file %s come back",reply.MapTaskNum, file)delete(c.mapTaskMap,reply.MapTaskNum)c.mapTask = append(c.mapTask, file)}}()return nil
}
func (c *Coordinator) MapOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {log.Printf("map over request be called, TaskId %d ",args.TaskId)log.Printf("TaskId value: %d",c.mapTaskMap[args.TaskId])c.mapTaskMapLock.Lock()c.midFileLock.Lock()defer c.mapTaskMapLock.Unlock()defer c.midFileLock.Unlock()for i:=0;i<c.nReduce;i++{log.Printf("will register %d files",len(args.RegisterFile[i]))c.midFile[i] = append(c.midFile[i],args.RegisterFile[i]...)}delete(c.mapTaskMap,args.TaskId)return nil
}func (c *Coordinator) ReduceRequest(args *ReduceCallArgs, reply *ReduceCallReply) error {c.reduceTaskLock.Lock()taskId := -1if len(c.reduceTask)>0{taskId = c.reduceTask[0]log.Printf("will send reduce task,task id is %d",taskId)c.reduceTask = c.reduceTask[1:len(c.reduceTask)]}else{log.Printf("reduceTask is empty")reply.Err = "Empty queue"return nil}c.reduceTaskLock.Unlock()var _ intif taskId!=-1 {c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()c.midFileLock.Lock()defer c.midFileLock.Unlock()c.reduceTaskNum++_ = c.reduceTaskNumc.reduceTaskMap[taskId] = truereply.TaskId = taskIdreply.TaskName = c.midFile[taskId]log.Printf("will dispose %d file,task id is %d",len(c.midFile[taskId]),taskId)reply.Err = ""}else{reply.Err = "Empty queue"return nil}//goes normally,begin routine to monitor this taskgo func(){time.Sleep(10*time.Second)c.reduceTaskLock.Lock()defer c.reduceTaskLock.Unlock()c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()if _,ok :=c.reduceTaskMap[taskId];ok{log.Printf("task %d fail",reply.TaskId)delete(c.reduceTaskMap,taskId)c.reduceTask = append(c.reduceTask, taskId)}}()return nil
}
func (c *Coordinator) ReduceOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()delete(c.reduceTaskMap,args.TaskId)log.Printf("task %d delete",args.TaskId)return nil
}//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}// Done
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {c.mapTaskLock.Lock()c.mapTaskMapLock.Lock()c.reduceTaskLock.Lock()c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()defer c.reduceTaskLock.Unlock()defer c.mapTaskMapLock.Unlock()defer c.mapTaskLock.Unlock()ret := false// Your code here.ret = len(c.mapTask)==0&&(len(c.mapTaskMap)==0)&&len(c.reduceTask)==0&&len(c.reduceTaskMap)==0return ret
}// MakeCoordinator
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// NReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{}// Your code here.c.mapTask = filesc.nReduce = nReducec.reduceTaskNum = 0c.mapTaskNum = 0for i := 0;i<nReduce;i++{c.reduceTask = append(c.reduceTask,i)}c.midFile=make(map[int][]string,nReduce)c.mapTaskMap=make(map[int] bool,1000)c.reduceTaskMap=make(map[int] bool,1000)c.server()return &c
}
worker
package mrimport ("encoding/json""fmt""io/ioutil""os""sort""strconv""time"
)
import "log"
import "net/rpc"
import "hash/fnv"// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// KeyValue
// Map functions return a slice of KeyValue.
//
type KeyValue struct {Key stringValue string
}//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {h := fnv.New32a()_, err := h.Write([]byte(key))if err != nil {return 0}return int(h.Sum32() & 0x7fffffff)
}
func GetMidFileName(x int,y int) string{s := "mr-"s+=strconv.Itoa(x)s+="-"s+=strconv.Itoa(y)return s
}// Worker
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.for{args := WorkerStatusArgs{}reply := WorkerStatusReply{}ret := call("Coordinator.WorkerStatus",&args,&reply)if !ret{break}switch reply.Status {case 0:ExecuteMapTask(mapf)case 1:ExecuteReduceTask(reducef)case 2:time.Sleep(2*time.Second)case 3:log.Printf("all task over ,worker will quit")break}log.Printf("task status: %d",reply.Status)}// uncomment to send the Example RPC to the coordinator.// CallExample()}// ExecuteMapTask
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func ExecuteMapTask(mapf func(string, string) []KeyValue){args := MapCallArgs{}reply := MapCallReply{}var registerFile map[int][]stringregisterFile = make(map[int][]string,reply.NReduce)call("Coordinator.MapRequest",&args,&reply)file, err := os.Open(reply.Value)if err != nil {log.Fatalf("cannot open %v", reply.Value)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", reply.Value)}err = file.Close()if err != nil {return}//wait for goroutines to completech := make(chan struct{})kva := mapf(reply.Value, string(content))//Temporary store of key-value pairsvar midResult map[int][]KeyValuemidResult = make(map[int][]KeyValue,10+5)for _,iter := range kva{iter := itergo func() {taskYNum := ihash(iter.Key)%reply.NReducemidResult[taskYNum] = append(midResult[taskYNum],iter)ch <- struct{}{}}()<-ch}for taskYNum:=0;taskYNum<reply.NReduce;taskYNum++{//sort.Sort(ByKey(midResult[taskYNum]))filename := GetMidFileName(reply.MapTaskNum,taskYNum)err := os.Remove(filename)file, err2 := ioutil.TempFile(".", "tmp")if err2 != nil {fmt.Println("文件创建失败")return}defer func(name string) {err := os.Remove(name)if err != nil {return}}(file.Name())enc := json.NewEncoder(file)for _,iter := range midResult[taskYNum]{err = enc.Encode(&iter)}err = os.Rename(file.Name(), filename)if err != nil {return}log.Printf("file write ok,will append registerfile subscript is %d,filename %s",taskYNum,filename)registerFile[taskYNum] = append(registerFile[taskYNum],filename)}mapOverRequestArgs := OverRequestArgs{}mapOverRequestReply := OverRequestReply{}mapOverRequestArgs.RegisterFile = registerFilemapOverRequestArgs.TaskId = reply.MapTaskNumlog.Printf("task ok,begin return %s, register file size %d", reply.Value,len(registerFile))call("Coordinator.MapOverRequest",&mapOverRequestArgs,&mapOverRequestReply)}func ExecuteReduceTask(reducef func(string, []string) string){args := ReduceCallArgs{}reply := ReduceCallReply{}call("Coordinator.ReduceRequest",&args,&reply)log.Printf("will dispose task %d",reply.TaskId)tmpFile, err2 := ioutil.TempFile(".", "tmp")if err2 != nil {log.Println("文件创建失败")return}defer func(name string) {err := os.Remove(name)if err != nil {return}}(tmpFile.Name())log.Printf("Reduce task file size: %d",len(reply.TaskName))var kva []KeyValue//range all files get slicefor _, fileName := range reply.TaskName {log.Printf("range taskname is %s", fileName)file, err := os.Open(fileName)if err != nil {log.Fatalf("cannot open %v", file)}dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}kva = append(kva, kv)}}//sort slicessort.Sort(ByKey(kva))//get result and print to final filevar nowStr stringvar values []stringfor subscript,kv :=range kva{if subscript==len(kva)-1 {if nowStr!=""{output:= reducef(kv.Key,values)_, err := fmt.Fprintf(tmpFile, "%v %v\n", nowStr, output)if err != nil {log.Printf("output to tmp file fail")return}}break}if kv.Key== nowStr {values = append(values, kv.Value)}else{if nowStr=="" {nowStr = kv.Keyvalues = nilvalues = append(values, kv.Value)}else{output:= reducef(kv.Key,values)_, err := fmt.Fprintf(tmpFile, "%v %v\n", kv.Key, output)if err != nil {log.Printf("output to tmp file fail")return}nowStr = kv.Keyvalues = nilvalues = append(values, kv.Value)}}}fileName := "mr-out-"fileName+=strconv.Itoa(reply.TaskId)err := os.Remove(fileName)if err != nil {log.Printf("file not exist")}err = os.Rename(tmpFile.Name(), fileName)if err != nil {log.Printf("rename file fail")return}OverRequestArgs := OverRequestArgs{}OverRequestReply := OverRequestReply{}OverRequestArgs.TaskId = reply.TaskIdlog.Printf("reduce task ok,begin return %d", reply.TaskId)call("Coordinator.ReduceOverRequest",&OverRequestArgs,&OverRequestReply)}func CallExample() {// declare an argument structure.args := ExampleArgs{}// fill in the argument(s).args.X = 99// declare a reply structure.reply := ExampleReply{}// send the RPC request, wait for the reply.call("Coordinator.Example", &args, &reply)// reply.Y should be 100.fmt.Printf("reply.Y %v\n", reply.Y)
}//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer func(c *rpc.Client) {err := c.Close()if err != nil {}}(c)err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}
rpc
package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import "os"
import "strconv"//
// example to show how to declare the arguments
// and reply for an RPC.
//type ExampleArgs struct {X int
}type ExampleReply struct {Y int
}// Add your RPC definitions here.type MapCallArgs struct{
}
type MapCallReply struct{Err stringValue stringNReduce intMapTaskNum int
}
type ReduceCallArgs struct{}
type ReduceCallReply struct{TaskName []stringTaskId intErr string
}
type WorkerStatusArgs struct{}
type WorkerStatusReply struct{Status int
}
type OverRequestArgs struct{TaskId intRegisterFile map[int][]string
}
type OverRequestReply struct{}type ReduceOverRequestArgs struct{PId int
}
type ReduceOverRequestReply struct{}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/824-mr-"s += strconv.Itoa(os.Getuid())return s
}
这篇关于MIT 6.824 MapReduce的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!