MapReduce源码分析——ReduceTask流程分析

2024-01-10 10:38

本文主要是介绍MapReduce源码分析——ReduceTask流程分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

Reduce会从Mapper任务中拉取很多小文件,小文件内部有序,但是整体是没序的,Reduce会合并小文件,然后套个归并算法,变成一个整体有序的文件。

Reducer 主要有3个基本的过程:

1.Shuffle阶段
Reducer会通过网络IO将Mapper端的排序输出给复制过来。

2.Sort阶段

  • 按key对reducer输入进行排序(因为不同的mapper可能输出相同的key)
  • shuffle和sort阶段同时发生,即在拉去mapper输出时,它们被合并。

3.Reduce阶段
在此阶段中,对排序输入中的每个group调用reduce(object,iterable,reducer.context)方法。reduce任务的输出通常通过reducer.context.write(object,object)写入记录编写器。reduce的输出没有重新排序。

源码解析

1.Shuffle阶段源码分析

@Override@SuppressWarnings("unchecked")public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());if (isMapOrReduce()) {copyPhase = getProgress().addPhase("copy");sortPhase  = getProgress().addPhase("sort");reducePhase = getProgress().addPhase("reduce");}//发送task任务报告,与父进程做交流TaskReporter reporter = startReporter(umbilical);//判断用的是新的MapReduceAPI还是旧的APIboolean useNewApi = job.getUseNewReducer();//核心代码,初始化任务initialize(job, getJobID(), reporter, useNewApi);//Reduce任务有4种,Job-setup Task, Job-cleanup Task, Task-cleanup Task和ReduceTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}// Initialize the codeccodec = initCodec();RawKeyValueIterator rIter = null;//使用的shuffle插件ShuffleConsumerPlugin shuffleConsumerPlugin = null;Class combinerClass = conf.getCombinerClass();CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter,shuffledMapsCounter,reduceShuffleBytes, failedShuffleCounter,mergedMapOutputsCounter,taskStatus, copyPhase, sortPhase, this,mapOutputFile, localMapFiles);//初始化shuffle插件,核心代码shuffleConsumerPlugin.init(shuffleContext);//跑shuflle核心代码,此步骤,会通过网络IO将Map端的输出给拉过来,并且进行合并操作~~~rIter = shuffleConsumerPlugin.run();// free up the data structuresmapOutputFilesOnDisk.clear();// sort is completesortPhase.complete();                         setPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();//分组比较RawComparator comparator = job.getOutputValueGroupingComparator();//如果前面3个任务都不是,执行的就是最主要的ReduceTask,根据新老API调用不同的方法if (useNewApi) {runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}

首先是shuffle阶段,Reduce进程启动一些数据copy线程,通过HTTP方式请求MapTask所在的NodeManager以获取输出文件。

Reducer是如何知道要去哪些机器取数据呢?

一旦map任务完成之后,就会通过常规心跳通知应用程序的Application Master。reduce的一个线程会周期性地向master询问,直到提取完所有数据。数据被reduce提走之后,map机器不会立刻删除数据,这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。


NodeManager需要为分区文件运行reduce任务。并且reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。而每个map任务的完成时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。

reduce任务有少量复制线程,因此能够并行取得map输出。默认线程数为5,但这个默认值可以通过mapreduce.reduce.shuffle.parallelcopies属性进行设置。

由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,所有的reduce就开始尝试从完成的mapper中下载该reduce对应的partition的部分数据(shuffle),因此map和reduce是交叉进行的。

 @Overridepublic RawKeyValueIterator run() throws IOException, InterruptedException {// Scale the maximum events we fetch per RPC call to mitigate OOM issues// on the ApplicationMaster when a thundering herd of reducers fetch events// TODO: This should not be necessary after HADOOP-8942int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);// Start the map-completion events fetcher threadfinal EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,maxEventsToFetch);eventFetcher.start();// Start the map-output fetcher threadsboolean isLocal = localMapFiles != null;final int numFetchers = isLocal ? 1 :jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];if (isLocal) {fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,merger, reporter, metrics, this, reduceTask.getShuffleSecret(),localMapFiles);fetchers[0].start();} else {for (int i=0; i < numFetchers; ++i) {fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret());fetchers[i].start();}}// Wait for shuffle to complete successfullywhile (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {reporter.progress();synchronized (this) {if (throwable != null) {throw new ShuffleError("error in shuffle in " + throwingThreadName,throwable);}}}// Stop the event-fetcher threadeventFetcher.shutDown();// Stop the map-output fetcher threadsfor (Fetcher<K,V> fetcher : fetchers) {fetcher.shutDown();}// stop the schedulerscheduler.close();copyPhase.complete(); // copy is already completetaskStatus.setPhase(TaskStatus.Phase.SORT);reduceTask.statusUpdate(umbilical);// Finish the on-going merges...RawKeyValueIterator kvIter = null;try {kvIter = merger.close();} catch (Throwable e) {throw new ShuffleError("Error while doing final merge " , e);}// Sanity checksynchronized (this) {if (throwable != null) {throw new ShuffleError("error in shuffle in " + throwingThreadName,throwable);}}return kvIter;}

将Map端复制过来的数据先放入内存缓冲区中
Merge有3种形式,分别是内存到内存,内存到磁盘,磁盘到磁盘。默认情况下第一种形式不启用,第二种Merge方式一直在运行(spill阶段)直到结束,然后启用第三种磁盘到磁盘的Merge方式生成最终的文件。(注意:为了合并,压缩的map输出都必须在内存中被解压缩)

一旦内存缓冲区达到缓存溢出到磁盘的阈值时(默认0.66),或达到Map任务在缓存溢出前能够保留在内存中的输出个数的阈值(默认1000),则合并后溢出写到磁盘中。如果指定combiner,则在合并期间运行它以降低写入硬盘的数据量。

随着磁盘上副本的增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。
复制完所有map输出后,reduce任务进入sort排序阶段(更恰当的说法是merge合并阶段,因为这个阶段的主要工作是执行了归并排序),循环进行归并排序(维持其顺序排序)。同时合并的文件流的数量由mapreduce.task.io.sort.factor属性决定(默认10)。

这里的merge和map端的merge动作类似,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才spill磁盘。这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置。

这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源码里面写死了) 来设置,这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。JVM的heapsize的70%。内存到磁盘merge的启动门限可以通过mapreduce.reduce.shuffle.merge.percent(default0.66)配置。

也就是说,如果该reduce task的最大heap使用量(通常通过mapreduce.admin.reduce.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。默认情况下,reduce会使用其heapsize的70%来在内存中缓存数据。假设 mapreduce.reduce.shuffle.input.buffer.percent 为0.7,reducetask的max heapsize为1G,那么用来做下载数据缓存的内存就为大概700MB左右。这700M的内存,跟map端一样,也不是要等到全部写满才会往磁盘刷的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷(刷磁盘前会先做sortMerge)。

这个限度阈值也是可以通过参数 mapreduce.reduce.shuffle.merge.percent(default0.66)来设定。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。这种merge方式一直在运行,直到没有map端的数据时才结束,然后启动磁盘到磁盘的merge方式生成最终的那个文件。

runNewReducer源码解读

void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {// wrap value iterator to report progress.//真正的迭代器final RawKeyValueIterator rawIter = rIter;rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};// make a task context so we can get the classesorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);// make a reducerorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(),//构建上下文的时候把迭代器传进来rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,//比较器valueClass);try {//构建完上下文之后运行Redude的Run方法reducer.run(reducerContext);} finally {trackedRW.close(reducerContext);}}

首先这里会将,一些属性封装到reducerContext这个对象中,其中包括了最重要的rIter这个对象,这个迭代器对象复制真正的读取数据的工作。之后调用的就是run方法,开始执行setup方法以及我们自定义的reduce方法,下面先看看ReduceContextImpl中是如何完成迭代的!

public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,//把迭代器传给输入对象InputRawKeyValueIterator input, Counter inputKeyCounter,Counter inputValueCounter,RecordWriter<KEYOUT,VALUEOUT> output,OutputCommitter committer,StatusReporter reporter,RawComparator<KEYIN> comparator,Class<KEYIN> keyClass,Class<VALUEIN> valueClass) throws InterruptedException, IOException{super(conf, taskid, output, committer, reporter);this.input = input;this.inputKeyCounter = inputKeyCounter;this.inputValueCounter = inputValueCounter;this.comparator = comparator;this.serializationFactory = new SerializationFactory(conf);this.keyDeserializer = serializationFactory.getDeserializer(keyClass);this.keyDeserializer.open(buffer);this.valueDeserializer = serializationFactory.getDeserializer(valueClass);this.valueDeserializer.open(buffer);hasMore = input.next();this.keyClass = keyClass;this.valueClass = valueClass;this.conf = conf;this.taskid = taskid;}
/** Start processing next unique key. *///实际上Reduce中run方法中的contect.netKey调用的逻辑public boolean nextKey() throws IOException,InterruptedException {///第一次假 放空while (hasMore && nextKeyIsSame) {nextKeyValue();}if (hasMore) {if (inputKeyCounter != null) {inputKeyCounter.increment(1);}return nextKeyValue();} else {return false;}}/*** Advance to the next key/value pair.*/@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!hasMore) {key = null;value = null;return false;}firstValue = !nextKeyIsSame;DataInputBuffer nextKey = input.getKey();currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition());buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());key = keyDeserializer.deserialize(key);DataInputBuffer nextVal = input.getValue();buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()- nextVal.getPosition());value = valueDeserializer.deserialize(value);currentKeyLength = nextKey.getLength() - nextKey.getPosition();currentValueLength = nextVal.getLength() - nextVal.getPosition();if (isMarked) {backupStore.write(nextKey, nextVal);}hasMore = input.next();if (hasMore) {nextKey = input.getKey();//判断当前key和下一个Key是否相等。nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(),nextKey.getData(),nextKey.getPosition(),nextKey.getLength() - nextKey.getPosition()) == 0;} else {nextKeyIsSame = false;}inputValueCounter.increment(1);return true;}public KEYIN getCurrentKey() {return key;}@Overridepublic VALUEIN getCurrentValue() {return value;}

接下来是Reduce中的run的方法

 public void run(Context context) throws IOException, InterruptedException {setup(context);try {//实际上在这一步里实际上调用了NextKeyValue的值更新了 hasmore,nextKeyisSame,Key,Value的值while (context.nextKey()) {       reduce(context.getCurrentKey(), context.getValues(), context);// If a back up store is used, reset itIterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        }}} finally {cleanup(context);}}

nextKey()会调用真的迭代器的方法,会对nextKeyIsSame进行判断,还对hasMore进行判断。
getValues()会返回一个,可迭代的对象,ValueIterable类型的。ValueIterable中iterator又会返回一个ValueIterator迭代器对象,下面看了看ValueIterator的源码。该类是ReduceContextImpl的内部类。

protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {private boolean inReset = false;private boolean clearMarkFlag = false;@Overridepublic boolean hasNext() {try {if (inReset && backupStore.hasNext()) {return true;} } catch (Exception e) {e.printStackTrace();throw new RuntimeException("hasNext failed", e);}return firstValue || nextKeyIsSame;}@Overridepublic VALUEIN next() {if (inReset) {try {if (backupStore.hasNext()) {backupStore.next();DataInputBuffer next = backupStore.nextValue();buffer.reset(next.getData(), next.getPosition(), next.getLength()- next.getPosition());value = valueDeserializer.deserialize(value);return value;} else {inReset = false;backupStore.exitResetMode();if (clearMarkFlag) {clearMarkFlag = false;isMarked = false;}}} catch (IOException e) {e.printStackTrace();throw new RuntimeException("next value iterator failed", e);}} // if this is the first record, we don't need to advanceif (firstValue) {firstValue = false;return value;}// if this isn't the first record and the next key is different, they// can't advance it here.if (!nextKeyIsSame) {throw new NoSuchElementException("iterate past last value");}// otherwise, go to the next key/value pairtry {//这个迭代器自身是没有数据的,在Next中调用的还是 nextKeyValue,在这个NextKeyValue中调用的是Input的输入数据nextKeyValue();return value;} catch (IOException ie) {throw new RuntimeException("next value iterator failed", ie);} catch (InterruptedException ie) {// this is bad, but we can't modify the exception list of java.utilthrow new RuntimeException("next value iterator interrupted", ie);        }}

可以看到这个迭代器自身对数据的迭代是通过之前的真实迭代器对象rIter来完成的,在Next中调用的还是rIter的 nextKeyValue方法,在这个NextKeyValue中调用的是Input的输入数据。

真正迭代器中有一个重要的标识NextKeyisSame,这个标识会被hasNext方法用到然后判断下一个key是否 相同,直到一组数据。

nextKeyValue该方法会对key和value进行赋值,同时调用hasmore对nextKeyIsSame进行判定是否是true,之后调用分组比较器,返回0则为true。

这里需要注意,自定义的reduce方法,如果迭代了value,每个value对应的key也是会随之迭代的。因为key这块是按引用传递。会改变同一块内存中的数据。也就是说通过值的迭代,也迭代key。

上面还需要注意的地方是。

如果用户设置了比较器会用用户自定义的分组比较器,如果用户没设置就用排序比较器当做分组比较器,否则用默认的key自带的比较器。

这篇关于MapReduce源码分析——ReduceTask流程分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/590504

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud