hadoop2提交到Yarn: Mapreduce执行过程reduce分析3

2024-08-27 11:58

本文主要是介绍hadoop2提交到Yarn: Mapreduce执行过程reduce分析3,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文

问题导读:
1.Reduce类主要有哪三个步骤?
2.Reduce的Copy都包含什么过程?
3.Sort主要做了哪些工作?






4.4 Reduce类4.4.1 Reduce介绍

整完了Map,接下来就是Reduce了。YarnChild.main()—>ReduceTask.run()。ReduceTask.run方法开始和MapTask类似,包括initialize()初始化,根据情况看是否调用runJobCleanupTask(),runTaskCleanupTask()等。之后进入正式的工作,主要有这么三个步骤:Copy、Sort、Reduce。
4.4.2 Copy
Copy就是从执行各个Map任务的节点获取map的输出文件。这是由ReduceTask.ReduceCopier 类来负责。ReduceCopier对象负责将Map函数的输出拷贝至Reduce所在机器。如果大小超过一定阈值就写到磁盘,否则放入内存,在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多。
Step1:
    首先在ReduceTask的run方法中,通过如下配置来mapreduce.job.reduce.shuffle.consumer.plugin.class装配shuffle的plugin。默认的实现是Shuffle类:
  1.      Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); 
  2.      shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
  3.      LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
复制代码

Step2:
    初始化上述的plugin后,执行其run方法,得到RawKeyValueIterator的实例。
run方法的执行步骤如下:
Step2.1:
    量化Reduce的事件数目:
  1. int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
  2.      int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
复制代码
Step2.2:
生成map的完成状态获取线程,并启动此线程:
  1. final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch);

  2.   eventFetcher.start();
复制代码

获取已经完成的Map信息,如Map的host、mapId等放入ShuffleSchedulerImpl中的Set<MapHost>中便于下面进行数据的拷贝传输。
  1.   URI u = getBaseURI(reduceId, event.getTaskTrackerHttp()); 
  2.        addKnownMapOutput(u.getHost() + ":" + u.getPort(), 
  3.            u.toString(), 
  4.            event.getTaskAttemptId()); 
  5.        maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
复制代码

Step2.3:
    在Shuffle类中启动初始化Fetcher线程组,并启动:
  1. boolean isLocal = localMapFiles != null;

  2.     final int numFetchers = isLocal ? 1 :

  3.       jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);

  4.     Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];

  5.     if (isLocal) {

  6.       fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,

  7.           merger, reporter, metrics, this, reduceTask.getShuffleSecret(),

  8.           localMapFiles);

  9.       fetchers[0].start();

  10.     } else {

  11.       for (int i=0; i < numFetchers; ++i) {

  12.         fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,

  13.                                        reporter, metrics, this,

  14.                                        reduceTask.getShuffleSecret());

  15.         fetchers[i].start();

  16.       }

  17.     }
复制代码

线程的run方法就是进行数据的远程拷贝:
  1. try { 
  2.           // If merge is on, block 
  3.           merger.waitForResource(); 

  4.           // Get a host to shuffle from
  5.           host = scheduler.getHost(); 
  6.           metrics.threadBusy(); 
  7.           // Shuffle 
  8.           copyFromHost(host); 
  9.         } finally { 
  10.           if (host != null) { 
  11.             scheduler.freeHost(host); 
  12.             metrics.threadFree();  
  13.           } 
  14.         }
复制代码

Step2.4:
来看下这个copyFromHost方法。主要是就是使用HttpURLConnection,实现远程数据的传输。
建立连接之后,从接收到的Stream流中读取数据。每次读取一个map文件。
  1. TaskAttemptID[] failedTasks = null;

  2.       while (!remaining.isEmpty() && failedTasks == null) {

  3.         failedTasks = copyMapOutput(host, input, remaining);

  4.       }
复制代码
上面的copyMapOutput方法中,每次读取一个mapid,根据MergeManagerImpl中的reserve函数,检查map的输出是否超过了mapreduce.reduce.memory.totalbytes配置的大小,此配置的默认值
是当前Runtime的maxMemory*mapreduce.reduce.shuffle.input.buffer.percent配置的值,Buffer.percent的默认值为0.90。
如果mapoutput超过了此配置的大小时,生成一个OnDiskMapOutput实例。在接下来的操作中,map的输出写入到local临时文件中。
如果没有超过此大小,生成一个InMemoryMapOutput实例。在接下来操作中,直接把map输出写入到内存。
最后,执行ShuffleScheduler.copySucceeded完成文件的copy,调用mapout.commit函数,更新状态或者触发merge操作。
Step2.5:
    等待上面所有的拷贝完成之后,关闭相关的线程。
  1. eventFetcher.shutDown();   

  2.     // Stop the map-output fetcher threads
  3.     for (Fetcher<K,V> fetcher : fetchers) {
  4.       fetcher.shutDown();
  5.     }   

  6.     // stop the scheduler
  7.     scheduler.close(); 

  8.     copyPhase.complete(); // copy is already complete
  9.     taskStatus.setPhase(TaskStatus.Phase.SORT);
  10.     reduceTask.statusUpdate(umbilical);
复制代码

Step2.6:
执行最终的merge操作,由Shuffle中的MergeManager完成:
  1. public RawKeyValueIterator close() throws Throwable {

  2.     // Wait for on-going merges to complete

  3.     if (memToMemMerger != null) {

  4.       memToMemMerger.close();

  5.     }

  6.     inMemoryMerger.close();

  7.     onDiskMerger.close();

  8.    

  9.     List<InMemoryMapOutput<K, V>> memory =

  10.       new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);

  11.     inMemoryMergedMapOutputs.clear();

  12.     memory.addAll(inMemoryMapOutputs);

  13.     inMemoryMapOutputs.clear();

  14.     List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);

  15.     onDiskMapOutputs.clear();

  16.     return finalMerge(jobConf, rfs, memory, disk);

  17.   }
复制代码

Step3:
释放资源。
  1. mapOutputFilesOnDisk.clear();
复制代码

  Copy完毕。
4.4.3 Sort
    Sort(其实相当于合并)就相当于排序工作的一个延续,它会在所有的文件都拷贝完毕后进行。使用工具类Merger归并所有的文件。经过此过程后,会产生一个合并了所有(所有并不准确)Map任务输出文件的新文件,而那些从其他各个服务器搞过来的 Map任务输出文件会删除。根据hadoop是否分布式来决定调用哪种排序方式。
    在上面的4.3.2节中的Step2.4结束之后就会触发此操作。
4.4.4 Reduce
    经过上面的步骤之后,回到ReduceTask中的run方法继续往下执行,调用runNewReducer。创建reducer:
  1. org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
  2.      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
  3.         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
复制代码

并执行其run方法,此run方法就是我们的org.apache.hadoop.mapreduce.Reducer中的run方法。
  1. public void run(Context context) throws IOException, InterruptedException {

  2.     setup(context);

  3.     try {

  4.       while (context.nextKey()) {

  5.         reduce(context.getCurrentKey(), context.getValues(), context);

  6.         // If a back up store is used, reset it

  7.         Iterator<VALUEIN> iter = context.getValues().iterator();

  8.         if(iter instanceof ReduceContext.ValueIterator) {

  9.           ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();       

  10.         }

  11.       }

  12.     } finally {

  13.       cleanup(context);

  14.     }

  15.   }

  16. }
复制代码

while的循环条件是ReduceContext.nextKey()为真,这个方法就在ReduceContext中实现的,这个方法的目的就是处理下一个唯一的key,因为reduce方法的输入数据是分组的,所以每次都会处理一个key及这个key对应的所有value,又因为已经将所有的Map Task的输出拷贝过来而且做了排序,所以key相同的KV对都是挨着的。
    nextKey方法中,又会调用nextKeyValue方法来尝试去获取下一个key值,并且如果没数据了就会返回false,如果还有数据就返回true。防止获取重复的数据就在这里做的处理。
接下来就是调用用户自定义的reduce方法了。
  1. public void reduce(Text key, Iterable<IntWritable> values,

  2.                        Context context

  3.                        ) throws IOException, InterruptedException {

  4.       int sum = 0;

  5.       for (IntWritable val : values) {

  6.         sum += val.get();

  7.       }

  8.       result.set(sum);

  9.       context.write(key, result);

  10.     }
复制代码


这篇关于hadoop2提交到Yarn: Mapreduce执行过程reduce分析3的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1111602

相关文章

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

maven 编译构建可以执行的jar包

💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~ 专栏导航 Python系列: Python面试题合集,剑指大厂Git系列: Git操作技巧GO

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

HTML提交表单给python

python 代码 from flask import Flask, request, render_template, redirect, url_forapp = Flask(__name__)@app.route('/')def form():# 渲染表单页面return render_template('./index.html')@app.route('/submit_form',