MIT 6.824 MapReduce

2024-04-21 20:58
文章标签 mit mapreduce 6.824

本文主要是介绍MIT 6.824 MapReduce,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

笑死,这个实验像是在做需求,不过没钱拿QAQ
仅供参考!测试脚本没完全通过,要求比较严格,这里仅仅把lab的map-reduce功能实现了

文章目录

  • 知识点总结
  • Lab1:MapReduce
    • 需要注意的
    • 代码
      • coordinator
      • worker
      • rpc

知识点总结

  • 简述一下GFS的一致性策略?
    元数据的写入是加锁串行的
    无并发写一个chunk的时候,当三个副本全部写成功的时候才会返回成功,否则返回失败
    这将导致GFS系统中文件的不一致性
    有并发的时候由primary决定写的顺序,所有的副本按照这个顺序执行,保证最终一致
    注意!串行成功是一致已定义的,并行成功是一致未定义的

  • master恢复
    master的历史信息使用快照存储,最近的操作使用日志存储

  • GFS如何进行快照?
    首先master取消对当前chunk的租约,保证对chunk的修改通过master进行
    之后创建快照 维护对于chunk的引用计数
    后续计数>2,则分开独立访问

  • GFS文件系统结构?
    GFS组织成树的结构,修改文件需要获得父亲节点的读锁和子节点的读写锁
    文件惰性删除,文件重命名为包含时间戳的隐藏文件名,在例行的文件空间扫描中才会删除
    这样删除更加可靠,cpu使用更加平衡,为人为的误操作兜底

  • GFS如何保证高可用?
    服务快速拉起,master和chunk server都设计成秒级启动
    chunk复制策略,保证数据不丢
    master节点建设主备,client操作在主master和从master全部落盘后才返回,外部监控进程监控master状态并在master故障后选择新的master升主

  • GFS如何保证数据完整性?
    chunk server将chunk切成64KB大小的块,并为每个块维护一个32位的checksum。对读操作,数据返回client之前会检查checksum。对写操作,需要对写范围有覆盖的第一个64KB块和最后一个先进行校验,防止原来存在损坏的数据被本次写隐藏了,然后进行实际写入并重新计算checksum。chunk server空闲时会对所有chunk做整体扫描,尤其针对一个不活动的chunk,防止master认为chunk已经有足够的副本数量了但是实际上副本内容已经损坏。

  • 状态备份的两种方式?
    状态转移(State transfer) 持续增量同步 Primary 的状态到 Backup,包括CPU、内存、IO设备等等;但是这些状态通常所占带宽很大,尤其是内存变化。
    冗余状态机(Replicated State Machine) 将服务器看作是一个具有确定性状态的状态机,只要给定相同初始状态和同样顺序的确定输入,就能保持同样的状态。同步的是外部的事件/操作/输入;同步的内容通常较小,但是依赖主机的一些特性:比如指令执行的确定性(deterministic)。而在物理机上保证确定性很难,但是在 VM 上就简单的多,由于 hypervisor 有对 VM 有完全的控制权,因此可以通过某些手段来额外同步某些不确定性输入(比如类似随机数、系统时钟等)。

  • 如何解决复制状态机中primary和backup不一致的问题?
    控制输出,保证状态一致后响应成功

  • VM-FT如何解决重复输出的问题?
    TCP栈序列号/检验重复的机制

  • VM-FT如何解决split brain?
    三方机构Test-and-Set,Test-and-Set服务就是一个仲裁官,决定了两个副本中哪一个应该上线 通过标志位实现

  • raft 如何进行leader选举?
    使用随机计时器

  • 为什么要有raft?它用来解决什么问题?
    raft是一个一致性算法,
    复制状态机(replicated state machines)用于对有单个集群leader的大型系统进行leader选举和配置信息的存储。复制状态机通常使用复制日志实现,一致性算法的任务是保证复制日志的一致性。

  • 共识算法的特性有哪些?

  1. 在网络延迟、分区和包丢失、复制和重排序条件下保证安全性(永远返回正确的结果)
  2. 只要大多数服务器可操作、可通信那么就是完全可用的
  3. 不依赖时间来保证一致性
  4. 少数速度较慢的服务器不影响整体系统性能

Lab1:MapReduce

需要注意的

  1. 中间文件名mr-X-Y,我的实现是每次worker发送map rpc请求的时候发送一个文件名作为X
  2. Done()退出,多打日志看看中间有没有data race,我的实现逻辑是当mapArray(map任务数组) mapMap(map任务crash-recovery标记) reduceArray(reduce任务数组) reduceMap(reduce任务crash-recovery标记) 长度全为0时退出
  3. 排序,我的实现是reduce读取根据nReduce 分组的文件,这个文件用map保存在Coordinator struct
  4. RPC-Server 没有在Client注册回调接口,所以crash的时候直接把任务放回任务数组就好
  5. 因为Golang不太熟,所以踩了几个坑:RPC的结构名需要首字母大写以便正常序列化/反序列化,GoLand参数有时候会出问题,直接命令行启动最好;
  6. data race我通过多加了几个mutex解决,可以优化一下减小粒度
  7. bash脚本中设置LC_COLLATE=C保证sort命令按照大写优先排序
  8. 测试脚本没完全通过,仅供参考(脚本要求比较严格,这里仅仅把lab功能实现了)

代码

coordinator

package mrimport ("log""sync""time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"type Coordinator struct {// Your definitions here.midFileLock sync.MutexnReduce       intmidFile		map[int][]stringmapTaskLock sync.MutexmapTaskMapLock sync.MutexmapTaskNum intmapTask       []stringmapTaskMap map[int]boolreduceTaskLock sync.MutexreduceTaskMapLock sync.MutexreduceTaskNum intreduceTask []intreduceTaskMap map[int]bool
}// Example Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {reply.Y = args.X + 1return nil
}
func (c *Coordinator) WorkerStatus(args *WorkerStatusArgs,reply *WorkerStatusReply) error{c.mapTaskLock.Lock()c.mapTaskMapLock.Lock()c.reduceTaskLock.Lock()c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()defer c.reduceTaskLock.Unlock()defer c.mapTaskMapLock.Unlock()defer c.mapTaskLock.Unlock()if len(c.mapTask)>0{reply.Status = 0}else if len(c.mapTaskMap)==0{//begin reduceif len(c.reduceTask)==0{if len(c.reduceTaskMap)==0{reply.Status = 3}reply.Status = 2}else {reply.Status = 1}}else{//wait for all map task overreply.Status = 2}log.Printf("get worker status,status is %d,len maptask %d,len maptaskmap %d,len reducetask %d len reducetask map %d",reply.Status,len(c.mapTask),len(c.mapTaskMap),len(c.reduceTask),len(c.reduceTaskMap))return nil
}
func (c *Coordinator) MapRequest(args *MapCallArgs, reply *MapCallReply) error {c.mapTaskLock.Lock()var file stringif len(c.mapTask)>0{file = c.mapTask[0]c.mapTask = c.mapTask[1:len(c.mapTask)]}c.mapTaskLock.Unlock()if file!="" {c.mapTaskMapLock.Lock()defer c.mapTaskMapLock.Unlock()c.mapTaskNum++reply.MapTaskNum = c.mapTaskNumc.mapTaskMap[c.mapTaskNum] = truereply.Value = filereply.NReduce = c.nReducereply.Err = ""log.Printf("%d task be sent,filename is %s", reply.MapTaskNum,reply.Value)}else{reply.Value = ""reply.Err = "Empty queue"return nil}go func(){time.Sleep(10*time.Second)c.mapTaskMapLock.Lock()c.mapTaskLock.Lock()defer c.mapTaskLock.Unlock()defer c.mapTaskMapLock.Unlock()if _,ok :=c.mapTaskMap[reply.MapTaskNum];ok{log.Printf("task %d fail,file %s come back",reply.MapTaskNum, file)delete(c.mapTaskMap,reply.MapTaskNum)c.mapTask = append(c.mapTask, file)}}()return nil
}
func (c *Coordinator) MapOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {log.Printf("map over request be called, TaskId %d ",args.TaskId)log.Printf("TaskId value: %d",c.mapTaskMap[args.TaskId])c.mapTaskMapLock.Lock()c.midFileLock.Lock()defer c.mapTaskMapLock.Unlock()defer c.midFileLock.Unlock()for i:=0;i<c.nReduce;i++{log.Printf("will register %d files",len(args.RegisterFile[i]))c.midFile[i] = append(c.midFile[i],args.RegisterFile[i]...)}delete(c.mapTaskMap,args.TaskId)return nil
}func (c *Coordinator) ReduceRequest(args *ReduceCallArgs, reply *ReduceCallReply) error {c.reduceTaskLock.Lock()taskId := -1if len(c.reduceTask)>0{taskId = c.reduceTask[0]log.Printf("will send reduce task,task id is %d",taskId)c.reduceTask = c.reduceTask[1:len(c.reduceTask)]}else{log.Printf("reduceTask is empty")reply.Err = "Empty queue"return nil}c.reduceTaskLock.Unlock()var _ intif taskId!=-1 {c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()c.midFileLock.Lock()defer c.midFileLock.Unlock()c.reduceTaskNum++_ = c.reduceTaskNumc.reduceTaskMap[taskId] = truereply.TaskId = taskIdreply.TaskName = c.midFile[taskId]log.Printf("will dispose %d file,task id is %d",len(c.midFile[taskId]),taskId)reply.Err = ""}else{reply.Err = "Empty queue"return nil}//goes normally,begin routine to monitor this taskgo func(){time.Sleep(10*time.Second)c.reduceTaskLock.Lock()defer c.reduceTaskLock.Unlock()c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()if _,ok :=c.reduceTaskMap[taskId];ok{log.Printf("task %d fail",reply.TaskId)delete(c.reduceTaskMap,taskId)c.reduceTask = append(c.reduceTask, taskId)}}()return nil
}
func (c *Coordinator) ReduceOverRequest(args *OverRequestArgs, reply *OverRequestReply) error {c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()delete(c.reduceTaskMap,args.TaskId)log.Printf("task %d delete",args.TaskId)return nil
}//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}// Done
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {c.mapTaskLock.Lock()c.mapTaskMapLock.Lock()c.reduceTaskLock.Lock()c.reduceTaskMapLock.Lock()defer c.reduceTaskMapLock.Unlock()defer c.reduceTaskLock.Unlock()defer c.mapTaskMapLock.Unlock()defer c.mapTaskLock.Unlock()ret := false// Your code here.ret = len(c.mapTask)==0&&(len(c.mapTaskMap)==0)&&len(c.reduceTask)==0&&len(c.reduceTaskMap)==0return ret
}// MakeCoordinator
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// NReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{}// Your code here.c.mapTask = filesc.nReduce = nReducec.reduceTaskNum = 0c.mapTaskNum = 0for i := 0;i<nReduce;i++{c.reduceTask = append(c.reduceTask,i)}c.midFile=make(map[int][]string,nReduce)c.mapTaskMap=make(map[int] bool,1000)c.reduceTaskMap=make(map[int] bool,1000)c.server()return &c
}

worker

package mrimport ("encoding/json""fmt""io/ioutil""os""sort""strconv""time"
)
import "log"
import "net/rpc"
import "hash/fnv"// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// KeyValue
// Map functions return a slice of KeyValue.
//
type KeyValue struct {Key   stringValue string
}//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {h := fnv.New32a()_, err := h.Write([]byte(key))if err != nil {return 0}return int(h.Sum32() & 0x7fffffff)
}
func GetMidFileName(x int,y int) string{s := "mr-"s+=strconv.Itoa(x)s+="-"s+=strconv.Itoa(y)return s
}// Worker
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.for{args := WorkerStatusArgs{}reply := WorkerStatusReply{}ret := call("Coordinator.WorkerStatus",&args,&reply)if !ret{break}switch reply.Status {case 0:ExecuteMapTask(mapf)case 1:ExecuteReduceTask(reducef)case 2:time.Sleep(2*time.Second)case 3:log.Printf("all task over ,worker will quit")break}log.Printf("task status: %d",reply.Status)}// uncomment to send the Example RPC to the coordinator.// CallExample()}// ExecuteMapTask
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func ExecuteMapTask(mapf func(string, string) []KeyValue){args := MapCallArgs{}reply := MapCallReply{}var registerFile map[int][]stringregisterFile = make(map[int][]string,reply.NReduce)call("Coordinator.MapRequest",&args,&reply)file, err := os.Open(reply.Value)if err != nil {log.Fatalf("cannot open %v", reply.Value)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", reply.Value)}err = file.Close()if err != nil {return}//wait for goroutines to completech := make(chan struct{})kva := mapf(reply.Value, string(content))//Temporary store of key-value pairsvar midResult map[int][]KeyValuemidResult = make(map[int][]KeyValue,10+5)for _,iter := range kva{iter := itergo func() {taskYNum := ihash(iter.Key)%reply.NReducemidResult[taskYNum] = append(midResult[taskYNum],iter)ch <- struct{}{}}()<-ch}for taskYNum:=0;taskYNum<reply.NReduce;taskYNum++{//sort.Sort(ByKey(midResult[taskYNum]))filename := GetMidFileName(reply.MapTaskNum,taskYNum)err := os.Remove(filename)file, err2 := ioutil.TempFile(".", "tmp")if err2 != nil {fmt.Println("文件创建失败")return}defer func(name string) {err := os.Remove(name)if err != nil {return}}(file.Name())enc := json.NewEncoder(file)for _,iter := range midResult[taskYNum]{err = enc.Encode(&iter)}err = os.Rename(file.Name(), filename)if err != nil {return}log.Printf("file write ok,will append registerfile subscript is %d,filename %s",taskYNum,filename)registerFile[taskYNum] = append(registerFile[taskYNum],filename)}mapOverRequestArgs := OverRequestArgs{}mapOverRequestReply := OverRequestReply{}mapOverRequestArgs.RegisterFile = registerFilemapOverRequestArgs.TaskId = reply.MapTaskNumlog.Printf("task ok,begin return %s, register file size %d", reply.Value,len(registerFile))call("Coordinator.MapOverRequest",&mapOverRequestArgs,&mapOverRequestReply)}func ExecuteReduceTask(reducef func(string, []string) string){args := ReduceCallArgs{}reply := ReduceCallReply{}call("Coordinator.ReduceRequest",&args,&reply)log.Printf("will dispose task %d",reply.TaskId)tmpFile, err2 := ioutil.TempFile(".", "tmp")if err2 != nil {log.Println("文件创建失败")return}defer func(name string) {err := os.Remove(name)if err != nil {return}}(tmpFile.Name())log.Printf("Reduce task file size: %d",len(reply.TaskName))var kva []KeyValue//range all files get slicefor _, fileName := range reply.TaskName {log.Printf("range taskname is %s", fileName)file, err := os.Open(fileName)if err != nil {log.Fatalf("cannot open %v", file)}dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}kva = append(kva, kv)}}//sort slicessort.Sort(ByKey(kva))//get result and print to final filevar nowStr stringvar values []stringfor subscript,kv :=range kva{if subscript==len(kva)-1 {if nowStr!=""{output:= reducef(kv.Key,values)_, err := fmt.Fprintf(tmpFile, "%v %v\n", nowStr, output)if err != nil {log.Printf("output to tmp file fail")return}}break}if kv.Key== nowStr {values = append(values, kv.Value)}else{if nowStr=="" {nowStr = kv.Keyvalues = nilvalues = append(values, kv.Value)}else{output:= reducef(kv.Key,values)_, err := fmt.Fprintf(tmpFile, "%v %v\n", kv.Key, output)if err != nil {log.Printf("output to tmp file fail")return}nowStr = kv.Keyvalues = nilvalues = append(values, kv.Value)}}}fileName := "mr-out-"fileName+=strconv.Itoa(reply.TaskId)err := os.Remove(fileName)if err != nil {log.Printf("file not exist")}err = os.Rename(tmpFile.Name(), fileName)if err != nil {log.Printf("rename file fail")return}OverRequestArgs := OverRequestArgs{}OverRequestReply := OverRequestReply{}OverRequestArgs.TaskId = reply.TaskIdlog.Printf("reduce task ok,begin return %d", reply.TaskId)call("Coordinator.ReduceOverRequest",&OverRequestArgs,&OverRequestReply)}func CallExample() {// declare an argument structure.args := ExampleArgs{}// fill in the argument(s).args.X = 99// declare a reply structure.reply := ExampleReply{}// send the RPC request, wait for the reply.call("Coordinator.Example", &args, &reply)// reply.Y should be 100.fmt.Printf("reply.Y %v\n", reply.Y)
}//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer func(c *rpc.Client) {err := c.Close()if err != nil {}}(c)err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}

rpc

package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import "os"
import "strconv"//
// example to show how to declare the arguments
// and reply for an RPC.
//type ExampleArgs struct {X int
}type ExampleReply struct {Y int
}// Add your RPC definitions here.type MapCallArgs struct{
}
type MapCallReply struct{Err stringValue      stringNReduce    intMapTaskNum int
}
type ReduceCallArgs struct{}
type ReduceCallReply struct{TaskName []stringTaskId   intErr      string
}
type WorkerStatusArgs struct{}
type WorkerStatusReply struct{Status int
}
type OverRequestArgs struct{TaskId          intRegisterFile map[int][]string
}
type OverRequestReply struct{}type ReduceOverRequestArgs struct{PId int
}
type ReduceOverRequestReply struct{}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/824-mr-"s += strconv.Itoa(os.Getuid())return s
}

这篇关于MIT 6.824 MapReduce的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/924023

相关文章

【Hadoop|MapReduce篇】MapReduce概述

1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. MapReduce优缺点 2.1 优点 MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式

MapReduce算法 – 反转排序(Order Inversion)

译者注:在刚开始翻译的时候,我将Order Inversion按照字面意思翻译成“反序”或者“倒序”,但是翻译完整篇文章之后,我感觉到,将Order Inversion翻译成反序模式是不恰当的,根据本文的内容,很显然,Inversion并非是将顺序倒排的意思,而是如同Spring的IOC一样,表明的是一种控制权的反转。Spring将对象的实例化责任从业务代码反转给了框架,而在本文的模式中,在map

圆形缓冲区-MapReduce中的

这篇文章来自一个读者在面试过程中的一个问题,Hadoop在shuffle过程中使用了一个数据结构-环形缓冲区。 环形队列是在实际编程极为有用的数据结构,它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组织简单。能很快知道队列是否满为空。能以很快速度的来存取数据。 因为有简单高效的原因,甚至在硬件都实现了环形队列。 环形队列广泛用于网络数据收发,和不同程序间数据交换(比如内核与应用

【硬刚Hadoop】HADOOP MAPREDUCE(11):Join应用

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 Reduce Join Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。 Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在

【硬刚Hadoop】HADOOP MAPREDUCE(10):OutputFormat数据输出

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 OutputFormat接口实现类 2 自定义OutputFormat 3 自定义OutputFormat案例实操 1.需求 过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/o

【硬刚Hadoop】HADOOP MAPREDUCE(9):MapReduce内核源码解析(2)ReduceTask工作机制

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1.ReduceTask工作机制 ReduceTask工作机制,如图4-19所示。 图4-19 ReduceTask工作机制 (1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中

【硬刚Hadoop】HADOOP MAPREDUCE(8):MapReduce内核源码解析(1)MapTask工作机制

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 MapTask工作机制 MapTask工作机制如图4-12所示。 图4-12  MapTask工作机制 (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 (2)Map阶段:该节点主要是将解析出

【硬刚Hadoop】HADOOP MAPREDUCE(7):Shuffle机制(3)

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 7 Combiner合并 (6)自定义Combiner实现步骤 (a)自定义一个Combiner继承Reducer,重写Reduce方法 public class WordcountCombiner extends Reducer<Text, IntWritable, Text,

【硬刚Hadoop】HADOOP MAPREDUCE(6):Shuffle机制(2)

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 4 WritableComparable排序 1.排序的分类 2.自定义排序WritableComparable (1)原理分析 bean对象做为key传输,需要实现WritableComp

【硬刚Hadoop】HADOOP MAPREDUCE(5):Shuffle机制(1)

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 Shuffle机制 Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。如图4-14所示。 2 Partition分区