本文主要是介绍MapReduce中的Shuffle和Sort分析 combine分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、Hadoop运行原理
Hadoop是一个开源的可运行于大规模集群上的分布式并行编程框架,其最核心的设计包括:MapReduce和HDFS。基于 Hadoop,你可以轻松地编写可处理海量数据的分布式并行程序,并将其运行于由成百上千个结点组成的大规模计算机集群上。
基于MapReduce计算模型编写分布式并行程序相对简单,程序员的主要工作就是设计实现Map和Reduce类,其它的并行编程中的种种复杂问题,如分布式存储,工作调度,负载平衡,容错处理,网络通信等,均由 MapReduce框架和HDFS文件系统负责处理,程序员完全不用操心。换句话说程序员只需要关心自己的业务逻辑即可,不必关心底层的通信机制等问题,即可编写出复杂高效的并行程序。如果说分布式并行编程的难度足以让普通程序员望而生畏的话,开源的 Hadoop的出现极大的降低了它的门槛。
2、Mapreduce原理
简单的说:MapReduce框架的核心步骤主要分两部分:Map和Reduce。当你向MapReduce框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map任务,然后分配到不同的节点上去执行,每一个Map任务处理输入数据中的一部分,当Map任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce任务的输入数据。Reduce对数据做进一步处理之后,输出最终结果。
MapReduce是Hadoop的核心技术之一,为分布式计算的程序设计提供了良好的编程接口,并且屏蔽了底层通信原理,使得程序员只需关心业务逻辑本事,就可轻易的编写出基于集群的分布式并行程序。从它名字上来看,大致可以看出个两个动词Map和Reduce,“Map(展开)”就是将一个任务分解成为多个子任务并行的执行,“Reduce”就是将分解后多任务处理的结果汇总起来,得出最后的分析结果并输出。
适合用 MapReduce来处理的数据集(或任务)有一个基本要求:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
Map-Reduce的处理过程主要涉及以下四个部分:
?Client进程:用于提交Map-reduce任务job;
?JobTracker进程:其为一个Java进程,其main class为JobTracker;
?TaskTracker进程:其为一个Java进程,其main class为TaskTracker;
?HDFS:Hadoop分布式文件系统,用于在各个进程间共享Job相关的文件;
其中JobTracker进程作为主控,用于调度和管理其它的TaskTracker进程, JobTracker可以运行于集群中任一台计算机上,通常情况下配置JobTracker进程运行在NameNode节点之上。TaskTracker负责执行JobTracker进程分配给的任务,其必须运行于 DataNode 上,即 DataNode 既是数据存储结点,也是计算结点。 JobTracker将Map任务和Reduce任务分发给空闲的TaskTracker,让这些任务并行运行,并负责监控任务的运行情况。如果某一个 TaskTracker出故障了,JobTracker会将其负责的任务转交给另一个空闲的TaskTracker重新运行。
本地计算-原理
数据存储在哪一台计算机上,就由这台计算机进行这部分数据的计算,这样可以减少数据在网络上的传输,降低对网络带宽的需求。在Hadoop这样的基于集群的分布式并行系统中,计算结点可以很方便地扩充,而因它所能够提供的计算能力近乎是无限的,但是由是数据需要在不同的计算机之间流动,故网络带宽变成了瓶颈,是非常宝贵的,“本地计算”是最有效的一种节约网络带宽的手段,业界把这形容为“移动计算比移动数据更经济”。
3、HDFS存储的机制
Hadoop的分布式文件系统 HDFS是建立在Linux文件系统之上的一个虚拟分布式文件系统,它由一个管理节点 ( NameNode )和N个数据节点 ( DataNode )组成,每个节点均是一台普通的计算机。在使用上同我们熟悉的单机上的文件系统非常类似,一样可以建目录,创建,复制,删除文件,查看文件内容等。但其底层实现上是把文件切割成Block(块),然后这些 Block分散地存储于不同的 DataNode 上,每个 Block还可以复制数份存储于不同的 DataNode上,达到容错容灾之目的。NameNode则是整个 HDFS的核心,它通过维护一些数据结构,记录了每一个文件被切割成了多少个 Block,这些 Block可以从哪些 DataNode中获得,各个 DataNode的状态等重要信息。
HDFS的数据块
每个磁盘都有默认的数据块大小,这是磁盘进行读写的基本单位.构建于单个磁盘之上的文件系统通过磁盘块来管理该文件系统中的块.该文件系统中的块一般为磁盘块的整数倍.磁盘块一般为512字节.HDFS也有块的概念,默认为64MB(一个map处理的数据大小).HDFS上的文件也被划分为块大小的多个分块,与其他文件系统不同的是,HDFS中小于一个块大小的文件不会占据整个块的空间.
任务粒度——数据切片(Splits)
把原始大数据集切割成小数据集时,通常让小数据集小于或等于 HDFS中一个 Block的大小(缺省是 64M),这样能够保证一个小数据集位于一台计算机上,便于本地计算。有 M个小数据集待处理,就启动 M个 Map任务,注意这 M个 Map任务分布于 N台计算机上并行运行,Reduce任务的数量 R则可由用户指定。
HDFS用块存储带来的第一个明显的好处一个文件的大小可以大于网络中任意一个磁盘的容量,数据块可以利用磁盘中任意一个磁盘进行存储.第二个简化了系统的设计,将控制单元设置为块,可简化存储管理,计算单个磁盘能存储多少块就相对容易.同时也消除了对元数据的顾虑,如权限信息,可以由其他系统单独管理。
4、举一个简单的例子说明MapReduce的运行机制
以计算一个文本文件中每个单词出现的次数的程序为例,<k1,v1>可以是 <行在文件中的偏移位置,文件中的一行>,经 Map函数映射之后,形成一批中间结果 <单词,出现次数>,而 Reduce函数则可以对中间结果进行处理,将相同单词的出现次数进行累加,得到每个单词的总的出现次数。
5.MapReduce的核心过程----Shuffle['??fl]和Sort
shuffle是mapreduce的心脏,了解了这个过程,有助于编写效率更高的mapreduce程序和hadoop调优。
Shuffle是指从Map产生输出开始,包括系统执行排序以及传送Map输出到Reducer作为输入的过程。如下图所示:
首先从Map端开始分析,当Map开始产生输出的时候,他并不是简单的把数据写到磁盘,因为频繁的操作会导致性能严重下降,他的处理更加复杂,数据首先是写到内存中的一个缓冲区,并作一些预排序,以提升效率,如图:
每个Map任务都有一个用来写入“输出数据”的“循环内存缓冲区”,这个缓冲区默认大小是100M(可以通过io.sort.mb属性来设置具体的大小),当缓冲区中的数据量达到一个特定的阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent默认是0.80)时,系统将会启动一个后台线程把缓冲区中的内容spill到磁盘。在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map就会被阻塞直到spill完成。spill线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的partition排序,然后每个partition中再按Key排序。输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的基础上进行。Combiner就是一个Mini Reducer,它在执行Map任务的节点本身运行,先对Map的输出作一次简单的Reduce,使得Map的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。Spill文件保存在由mapred.local.dir指定的目录中,Map任务结束后删除。
每当内存中的数据达到spill阀值的时候,都会产生一个新的spill文件,所以在Map任务写完他的最后一个输出记录的时候,可能会有多个spill文件,在Map任务完成前,所有的spill文件将会被归并排序为一个索引文件和数据文件。如图3所示。这是一个多路归并过程,最大归并路数由io.sort.factor控制(默认是10)。如果设定了Combiner,并且spill文件的数量至少是3(由min.num.spills.for.combine属性控制),那么Combiner将在输出文件被写入磁盘前运行以压缩数据。
对写入到磁盘的数据进行压缩(这种压缩同Combiner的压缩不一样)通常是一个很好的方法,因为这样做使得数据写入磁盘的速度更快,节省磁盘空间,并减少需要传送到Reducer的数据量。默认输出是不被压缩的,但可以很简单的设置mapred.compress.map.output为true启用该功能。压缩所使用的库由mapred.map.output.compression.codec来设定。
当spill 文件归并完毕后,Map 将删除所有的临时spill文件,并告知TaskTracker任务已完成。Reducers通过HTTP来获取对应的数据。用来传输partitions数据的工作线程个数由tasktracker.http.threads控制,这个设定是针对每一个TaskTracker的,并不是单个Map,默认值为40,在运行大作业的大集群上可以增大以提升数据传输速率。
现在让我们转到Shuffle的Reduce部分。Map的输出文件放置在运行Map任务的TaskTracker的本地磁盘上(注意:Map输出总是写到本地磁盘,但是Reduce输出不是,一般是写到HDFS),它是运行Reduce任务的TaskTracker所需要的输入数据。Reduce任务的输入数据分布在集群内的多个Map任务的输出中,Map任务可能会在不同的时间内完成,只要有其中一个Map任务完成,Reduce任务就开始拷贝他的输出。这个阶段称为拷贝阶段,Reduce任务拥有多个拷贝线程,可以并行的获取Map输出。可以通过设定mapred.reduce.parallel.copies来改变线程数。
Reduce是怎么知道从哪些TaskTrackers中获取Map的输出呢?当Map任务完成之后,会通知他们的父TaskTracker,告知状态更新,然后TaskTracker再转告JobTracker,这些通知信息是通过心跳通信机制传输的,因此针对以一个特定的作业,jobtracker知道Map输出与tasktrackers的映射关系。Reducer中有一个线程会间歇的向JobTracker询问Map输出的地址,直到把所有的数据都取到。在Reducer取走了Map输出之后,TaskTracker不会立即删除这些数据,因为Reducer可能会失败,他们会在整个作业完成之后,JobTracker告知他们要删除的时候才去删除。
如果Map输出足够小,他们会被拷贝到Reduce TaskTracker的内存中(缓冲区的大小由mapred.job.shuffle.input.buffer.percnet控制),或者达到了Map输出的阀值的大小(由mapred.inmem.merge.threshold控制),缓冲区中的数据将会被归并然后spill到磁盘。
拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,这样做节省了后期归并的时间。对于经过压缩的Map输出,系统会自动把它们解压到内存方便对其执行归并。
当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map端就已经完成),这个阶段会对所有的Map输出进行归并排序,这个工作会重复多次才能完成。
假设这里有50 个Map 输出(可能有保存在内存中的),并且归并因子是10(由io.sort.factor控制,就像Map端的merge一样),那最终需要5次归并。每次归并会把10个文件归并为一个,最终生成5个中间文件。在这一步之后,系统不再把5个中间文件归并成一个,而是排序后直接“喂”给Reduce函数,省去向磁盘写数据这一步。最终归并的数据可以是混合数据,既有内存上的也有磁盘上的。由于归并的目的是归并最少的文件数目,使得在最后一次归并时总文件个数达到归并因子的数目,所以每次操作所涉及的文件个数在实际中会更微妙些。譬如,如果有40个文件,并不是每次都归并10个最终得到4个文件,相反第一次只归并4个文件,然后再实现三次归并,每次10个,最终得到4个归并好的文件和6个未归并的文件。要注意,这种做法并没有改变归并的次数,只是最小化写入磁盘的数据优化措施,因为最后一次归并的数据总是直接送到Reduce函数那里。在Reduce阶段,Reduce函数会作用在排序输出的每一个key上。这个阶段的输出被直接写到输出文件系统,一般是HDFS。在HDFS中,因为TaskTracker节点也运行着一个DataNode进程,所以第一个块备份会直接写到本地磁盘。到此,MapReduce的Shuffle和Sort分析完毕。
6、Hadoop中Combiner的作用?
6.1 Partition
把 Map任务输出的中间结果按 key的范围划分成 R份( R是预先定义的 Reduce任务的个数),划分时通常使用hash函数如: hash(key) mod R,这样可以保证某一段范围内的key,一定是将会由一个Reduce任务来处理,这样可以简化 Reduce获取计算数据的过程。
6.2 Combine操作
在 partition之前,还可以对中间结果先做 combine,即将中间结果中有相同 key的 <key, value>对合并成一对。combine的过程与 Reduce的过程类似,很多情况下就可以直接使用 Reduce函数,但 combine是作为 Map任务的一部分,在执行完 Map函数后紧接着执行的,而Reduce必须在所有的Map操作完成后才能进行。Combine能够减少中间结果中 <key, value>对的数目,从而减少网络流量。
6.3 Reduce任务从 Map任务结点取中间结果
Map 任务的中间结果在做完 Combine和 Partition之后,以文件形式存于本地磁盘。中间结果文件的位置会通知主控 JobTracker,JobTracker再通知 Reduce任务到哪一个DataNode上去取中间结果。注意所有的 Map任务产生中间结果均按其 Key用同一个 Hash函数划分成了 R份,R个 Reduce任务各自负责一段 Key区间。每个 Reduce需要向许多个原Map任务结点以取得落在其负责的Key区间内的中间结果,然后执行 Reduce函数,形成一个最终的结果文件。
6.4 任务管道
有R个 Reduce任务,就会有 R个最终结果,很多情况下这 R个最终结果并不需要合并成一个最终结果。因为这 R个最终结果又可以做为另一个计算任务的输入,开始另一个并行计算任务。
这个 MapReduce的计算过程简而言之,就是将大数据集分解为成百上千的小数据集,每个(或若干个)数据集分别由集群中的一个结点(一般就是一台普通的计算机)进行处理并生成中间结果,然后这些中间结果又由大量的结点进行合并,形成最终结果。
计算模型的核心是 Map 和 Reduce 两个函数,这两个函数由用户负责实现,功能是按一定的映射规则将输入的 <key, value>对转换成另一个或一批 <key, value>对输出。
6.5、总结
(1)、combiner使用的合适,可以在满足业务的情况下提升job的速度,如果不合适,则将导致输出的结果不正确,但是不是所有的场合都适合combiner。根据自己的业务来使用。hadoop就是map和 reduce的过程。服务器上一个目录节点+多个数据节点。将程序传送到各个节点,在数据节点上进行计算
(2)、将数据存储到不同节点,用map方式对应管理,在各个节点进行计算,采用reduce进行合并结果集
(3)、就是通过java程序和目录节点配合,将数据存放到不同数据节点上
(4)、看上边的2.注意,分布式注重的是计算,不是每个场景都适合
(5)、将文件存放到不同的数据节点,然后每个节点计算出前十个进行reduce的计算。
最近整了很长一段时间才了解了map reduce的工作原理,shuffle是mapreduce的心脏,了解了这个过程,有助于编写效率更高的mapreduce程序和hadoop调优。自己画了一幅流程图(点击查看全图):
另外,还找到一篇文章,很好,引用一下。
Hadoop
是Apache 下的一个项目,由HDFS、MapReduce、HBase、Hive 和ZooKeeper等成员组成。其中,HDFS 和MapReduce 是两个最基础最重要的成员。
HDFS是Google GFS 的开源版本,一个高度容错的分布式文件系统,它能够提供高吞吐量的数据访问,适合存储海量(PB 级)的大文件(通常超过64M),其原理如下图所示:
采用Master/Slave 结构。NameNode 维护集群内的元数据,对外提供创建、打开、删除和重命名文件或目录的功能。DatanNode 存储数据,并提负责处理数据的读写请求。DataNode定期向NameNode 上报心跳,NameNode 通过响应心跳来控制DataNode。
InfoWord将MapReduce 评为2009 年十大新兴技术的冠军。MapReduce 是大规模数据(TB 级)计算的利器,Map 和Reduce 是它的主要思想,来源于函数式编程语言,它的原理如下图所示:Map负责将数据打散,Reduce负责对数据进行聚集,用户只需要实现map 和reduce 两个接口,即可完成TB级数据的计算,常见的应用包括:日志分析和数据挖掘等数据分析应用。另外,还可用于科学数据计算,如圆周率PI 的计算等。Hadoop MapReduce的实现也采用了Master/Slave 结构。Master 叫做JobTracker,而Slave 叫做TaskTracker。用户提交的计算叫做Job,每一个Job会被划分成若干个Tasks。JobTracker负责Job 和Tasks 的调度,而TaskTracker负责执行Tasks。
MapReduce中的Shuffle和Sort分析
MapReduce 是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是Google 公司,而Google 的灵感则来自于函数式编程语言,如LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出。从高层抽象来看,MapReduce的数据流图如图1 所示:
本文的重点是剖析MapReduce的核心过程----Shuffle和Sort。在本文中,Shuffle是指从Map产生输出开始,包括系统执行排序以及传送Map输出到Reducer作为输入的过程。在这里我们将去探究Shuffle是如何工作的,因为对基础的理解有助于对MapReduce程序进行调优。
首先从Map端开始分析,当Map开始产生输出的时候,他并不是简单的把数据写到磁盘,因为频繁的操作会导致性能严重下降,他的处理更加复杂,数据首先是写到内存中的一个缓冲区,并作一些预排序,以提升效率,如图:
每个Map任务都有一个用来写入输出数据的循环内存缓冲区,这个缓冲区默认大小是100M,可以通过io.sort.mb属性来设置具体的大小,当缓冲区中的数据量达到一个特定的阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默认是0.80)时,系统将会启动一个后台线程把缓冲区中的内容spill 到磁盘。在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map就会被阻塞直道spill完成。spill线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的partition排序,然后每个partition中再按Key排序。输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的基础上进行。Combiner就是一个Mini Reducer,它在执行Map任务的节点本身运行,先对Map的输出作一次简单的Reduce,使得Map的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。Spill文件保存在由mapred.local.dir指定的目录中,Map任务结束后删除。
每当内存中的数据达到spill阀值的时候,都会产生一个新的spill文件,所以在Map任务写完他的最后一个输出记录的时候,可能会有多个spill文件,在Map任务完成前,所有的spill文件将会被归并排序为一个索引文件和数据文件。如图3 所示。这是一个多路归并过程,最大归并路数由io.sort.factor 控制(默认是10)。如果设定了Combiner,并且spill文件的数量至少是3(由min.num.spills.for.combine 属性控制),那么Combiner 将在输出文件被写入磁盘前运行以压缩数据。
对写入到磁盘的数据进行压缩(这种压缩同Combiner 的压缩不一样)通常是一个很好的方法,因为这样做使得数据写入磁盘的速度更快,节省磁盘空间,并减少需要传送到Reducer 的数据量。默认输出是不被压缩的, 但可以很简单的设置mapred.compress.map.output为true 启用该功能。压缩所使用的库由mapred.map.output.compression.codec来设定
当spill 文件归并完毕后,Map 将删除所有的临时spill 文件,并告知TaskTracker 任务已完成。Reducers 通过HTTP 来获取对应的数据。用来传输partitions 数据的工作线程个数由tasktracker.http.threads 控制,这个设定是针对每一个TaskTracker 的,并不是单个Map,默认值为40,在运行大作业的大集群上可以增大以提升数据传输速率。
现在让我们转到Shuffle的Reduce部分。Map的输出文件放置在运行Map任务的TaskTracker的本地磁盘上(注意:Map输出总是写到本地磁盘,但是Reduce输出不是,一般是写到HDFS),它是运行Reduce任务的TaskTracker所需要的输入数据。Reduce任务的输入数据分布在集群内的多个Map任务的输出中,Map任务可能会在不同的时间内完成,只要有其中一个Map任务完成,Reduce任务就开始拷贝他的输出。这个阶段称为拷贝阶段,Reduce任务拥有多个拷贝线程,可以并行的获取Map输出。可以通过设定mapred.reduce.parallel.copies来改变线程数。
Reduce是怎么知道从哪些TaskTrackers中获取Map的输出呢?当Map任务完成之后,会通知他们的父TaskTracker,告知状态更新,然后TaskTracker再转告JobTracker,这些通知信息是通过心跳通信机制传输的,因此针对以一个特定的作业,jobtracker知道Map输出与tasktrackers的映射关系。Reducer中有一个线程会间歇的向JobTracker询问Map输出的地址,直到把所有的数据都取到。在Reducer取走了Map输出之后,TaskTracker不会立即删除这些数据,因为Reducer可能会失败,他们会在整个作业完成之后,JobTracker告知他们要删除的时候才去删除。
如果Map输出足够小,他们会被拷贝到Reduce TaskTracker的内存中(缓冲区的大小由mapred.job.shuffle.input.buffer.percnet控制),或者达到了Map输出的阀值的大小(由mapred.inmem.merge.threshold控制),缓冲区中的数据将会被归并然后spill到磁盘。
拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,这样做节省了后期归并的时间。对于经过压缩的Map 输出,系统会自动把它们解压到内存方便对其执行归并。
当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map 端就已经完成),这个阶段会对所有的Map 输出进行归并排序,这个工作会重复多次才能完成。
假设这里有50 个Map 输出(可能有保存在内存中的),并且归并因子是10(由io.sort.factor控制,就像Map 端的merge 一样),那最终需要5 次归并。每次归并会把10个文件归并为一个,最终生成5 个中间文件。在这一步之后,系统不再把5 个中间文件归并成一个,而是排序后直接“喂”给Reduce 函数,省去向磁盘写数据这一步。最终归并的数据可以是混合数据,既有内存上的也有磁盘上的。由于归并的目的是归并最少的文件数目,使得在最后一次归并时总文件个数达到归并因子的数目,所以每次操作所涉及的文件个数在实际中会更微妙些。譬如,如果有40 个文件,并不是每次都归并10 个最终得到4 个文件,相反第一次只归并4 个文件,然后再实现三次归并,每次10 个,最终得到4 个归并好的文件和6 个未归并的文件。要注意,这种做法并没有改变归并的次数,只是最小化写入磁盘的数据优化措施,因为最后一次归并的数据总是直接送到Reduce 函数那里。在Reduce 阶段,Reduce 函数会作用在排序输出的每一个key 上。这个阶段的输出被直接写到输出文件系统,一般是HDFS。在HDFS 中,因为TaskTracker 节点也运行着一个DataNode 进程,所以第一个块备份会直接写到本地磁盘。到此,MapReduce 的Shuffle 和Sort 分析完毕。
最近整了很长一段时间才了解了map reduce的工作原理,shuffle是mapreduce的心脏,了解了这个过程,有助于编写效率更高的mapreduce程序和hadoop调优。自己画了一幅流程图(点击查看全图):
另外,还找到一篇文章,很好,引用一下。
Hadoop
是Apache 下的一个项目,由HDFS、MapReduce、HBase、Hive 和ZooKeeper等成员组成。其中,HDFS 和MapReduce 是两个最基础最重要的成员。
HDFS是Google GFS 的开源版本,一个高度容错的分布式文件系统,它能够提供高吞吐量的数据访问,适合存储海量(PB 级)的大文件(通常超过64M),其原理如下图所示:
采用Master/Slave 结构。NameNode 维护集群内的元数据,对外提供创建、打开、删除和重命名文件或目录的功能。DatanNode 存储数据,并提负责处理数据的读写请求。DataNode定期向NameNode 上报心跳,NameNode 通过响应心跳来控制DataNode。
InfoWord将MapReduce 评为2009 年十大新兴技术的冠军。MapReduce 是大规模数据(TB 级)计算的利器,Map 和Reduce 是它的主要思想,来源于函数式编程语言,它的原理如下图所示:Map负责将数据打散,Reduce负责对数据进行聚集,用户只需要实现map 和reduce 两个接口,即可完成TB级数据的计算,常见的应用包括:日志分析和数据挖掘等数据分析应用。另外,还可用于科学数据计算,如圆周率PI 的计算等。Hadoop MapReduce的实现也采用了Master/Slave 结构。Master 叫做JobTracker,而Slave 叫做TaskTracker。用户提交的计算叫做Job,每一个Job会被划分成若干个Tasks。JobTracker负责Job 和Tasks 的调度,而TaskTracker负责执行Tasks。
MapReduce中的Shuffle和Sort分析
MapReduce 是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是Google 公司,而Google 的灵感则来自于函数式编程语言,如LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出。从高层抽象来看,MapReduce的数据流图如图1 所示:
本文的重点是剖析MapReduce的核心过程----Shuffle和Sort。在本文中,Shuffle是指从Map产生输出开始,包括系统执行排序以及传送Map输出到Reducer作为输入的过程。在这里我们将去探究Shuffle是如何工作的,因为对基础的理解有助于对MapReduce程序进行调优。
首先从Map端开始分析,当Map开始产生输出的时候,他并不是简单的把数据写到磁盘,因为频繁的操作会导致性能严重下降,他的处理更加复杂,数据首先是写到内存中的一个缓冲区,并作一些预排序,以提升效率,如图:
每个Map任务都有一个用来写入输出数据的循环内存缓冲区,这个缓冲区默认大小是100M,可以通过io.sort.mb属性来设置具体的大小,当缓冲区中的数据量达到一个特定的阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默认是0.80)时,系统将会启动一个后台线程把缓冲区中的内容spill 到磁盘。在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map就会被阻塞直道spill完成。spill线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的partition排序,然后每个partition中再按Key排序。输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的基础上进行。Combiner就是一个Mini Reducer,它在执行Map任务的节点本身运行,先对Map的输出作一次简单的Reduce,使得Map的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。Spill文件保存在由mapred.local.dir指定的目录中,Map任务结束后删除。
每当内存中的数据达到spill阀值的时候,都会产生一个新的spill文件,所以在Map任务写完他的最后一个输出记录的时候,可能会有多个spill文件,在Map任务完成前,所有的spill文件将会被归并排序为一个索引文件和数据文件。如图3 所示。这是一个多路归并过程,最大归并路数由io.sort.factor 控制(默认是10)。如果设定了Combiner,并且spill文件的数量至少是3(由min.num.spills.for.combine 属性控制),那么Combiner 将在输出文件被写入磁盘前运行以压缩数据。
对写入到磁盘的数据进行压缩(这种压缩同Combiner 的压缩不一样)通常是一个很好的方法,因为这样做使得数据写入磁盘的速度更快,节省磁盘空间,并减少需要传送到Reducer 的数据量。默认输出是不被压缩的, 但可以很简单的设置mapred.compress.map.output为true 启用该功能。压缩所使用的库由mapred.map.output.compression.codec来设定
当spill 文件归并完毕后,Map 将删除所有的临时spill 文件,并告知TaskTracker 任务已完成。Reducers 通过HTTP 来获取对应的数据。用来传输partitions 数据的工作线程个数由tasktracker.http.threads 控制,这个设定是针对每一个TaskTracker 的,并不是单个Map,默认值为40,在运行大作业的大集群上可以增大以提升数据传输速率。
现在让我们转到Shuffle的Reduce部分。Map的输出文件放置在运行Map任务的TaskTracker的本地磁盘上(注意:Map输出总是写到本地磁盘,但是Reduce输出不是,一般是写到HDFS),它是运行Reduce任务的TaskTracker所需要的输入数据。Reduce任务的输入数据分布在集群内的多个Map任务的输出中,Map任务可能会在不同的时间内完成,只要有其中一个Map任务完成,Reduce任务就开始拷贝他的输出。这个阶段称为拷贝阶段,Reduce任务拥有多个拷贝线程,可以并行的获取Map输出。可以通过设定mapred.reduce.parallel.copies来改变线程数。
Reduce是怎么知道从哪些TaskTrackers中获取Map的输出呢?当Map任务完成之后,会通知他们的父TaskTracker,告知状态更新,然后TaskTracker再转告JobTracker,这些通知信息是通过心跳通信机制传输的,因此针对以一个特定的作业,jobtracker知道Map输出与tasktrackers的映射关系。Reducer中有一个线程会间歇的向JobTracker询问Map输出的地址,直到把所有的数据都取到。在Reducer取走了Map输出之后,TaskTracker不会立即删除这些数据,因为Reducer可能会失败,他们会在整个作业完成之后,JobTracker告知他们要删除的时候才去删除。
如果Map输出足够小,他们会被拷贝到Reduce TaskTracker的内存中(缓冲区的大小由mapred.job.shuffle.input.buffer.percnet控制),或者达到了Map输出的阀值的大小(由mapred.inmem.merge.threshold控制),缓冲区中的数据将会被归并然后spill到磁盘。
拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,这样做节省了后期归并的时间。对于经过压缩的Map 输出,系统会自动把它们解压到内存方便对其执行归并。
当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map 端就已经完成),这个阶段会对所有的Map 输出进行归并排序,这个工作会重复多次才能完成。
假设这里有50 个Map 输出(可能有保存在内存中的),并且归并因子是10(由io.sort.factor控制,就像Map 端的merge 一样),那最终需要5 次归并。每次归并会把10个文件归并为一个,最终生成5 个中间文件。在这一步之后,系统不再把5 个中间文件归并成一个,而是排序后直接“喂”给Reduce 函数,省去向磁盘写数据这一步。最终归并的数据可以是混合数据,既有内存上的也有磁盘上的。由于归并的目的是归并最少的文件数目,使得在最后一次归并时总文件个数达到归并因子的数目,所以每次操作所涉及的文件个数在实际中会更微妙些。譬如,如果有40 个文件,并不是每次都归并10 个最终得到4 个文件,相反第一次只归并4 个文件,然后再实现三次归并,每次10 个,最终得到4 个归并好的文件和6 个未归并的文件。要注意,这种做法并没有改变归并的次数,只是最小化写入磁盘的数据优化措施,因为最后一次归并的数据总是直接送到Reduce 函数那里。在Reduce 阶段,Reduce 函数会作用在排序输出的每一个key 上。这个阶段的输出被直接写到输出文件系统,一般是HDFS。在HDFS 中,因为TaskTracker 节点也运行着一个DataNode 进程,所以第一个块备份会直接写到本地磁盘。到此,MapReduce 的Shuffle 和Sort 分析完毕。
这篇关于MapReduce中的Shuffle和Sort分析 combine分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!