Hadoop2源码分析-MapReduce篇

2024-05-27 12:32

本文主要是介绍Hadoop2源码分析-MapReduce篇,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.概述

  前面我们已经对Hadoop有了一个初步认识,接下来我们开始学习Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天为大家分享的是mapreduce部分,其内容目录如下所示:

  • MapReduce V1
  • MapReduce V2
  • MR V1和MR V2的区别
  • MR V2的重构思路

  本篇文章的源码是基于hadoop-2.6.0-src.tar.gz来完成的。代码下载地址,请参考《Hadoop2源码分析-准备篇》。

2.MapReduce V1

  下面我们给出第一代的MapReduce的架构图,如下所示:

  上图描述了第一代MapReduce框架的流程以及设计思路,下面为大家解释下这张图的具体含义:

  • 当我们编写完MR作业后,需要通过JobClient来提交一个job,提交的信息会发送到JobTracker模块,这个模块是第一代MapReduce计算框架的核心之一,它负责与集群中的其他节点维持心跳,为提交的作业分配资源,管理提交的作业的正常运作(失败,重启等)。
  • 第一代MapReduce的另一个核心的功能是TaskTracker,在各个TaskTracker安装节点上,它的主要功能是监控自己所在节点的资源使用情况。
  • TaskTracker监控当前节点的Tasks的运行情况,其中包含Map Task和Reduce Task,最后由Reduce Task到Reduce阶段,将结果输送到HDFS的文件系统中;其中的具体流程如图中描述的1-7步骤。TaskTracker在监控期间,需要把这些信息通过心跳机制发送给JobTracker,JobTracker收集到这些信息后,给新提交的作业分配其他的资源,避免重复资源分配。

  可以看出,第一代的MapReduce架构简单清晰,在刚面世的那几年,也曾获得总多企业的支持和认可。但随着分布式集群的规模和企业业务的增长,第一代框架的问题也逐渐暴露出来,主要有以下问题:

  • JobTracker是第一代MapReduce的入口点,若是JobTracker服务宕机,整个服务将会瘫痪,存在单点问题。
  • JobTracker负责的事情太多,完成来太多的任务,占用过多的资源,当Job数非常多的时候,会消耗很多内存,容易出现性能瓶颈。
  • 对TaskTracker而言,Task担当的角色过于简单,没有考虑到CPU及内存的使用情况,若存在多个大内存的Task被集中调度,容易出现内存溢出。
  • 另外,TaskTracker把资源强制分为map task slot和reduce task slot,若是MR任务中只存在其中一个(map或是reduce),会出现资源浪费的情况,资源利用率低。
  • 从开发人员的角度来说,源码分析的时候,阅读性不够友好,代码量大,任务不清晰,给开发人员在修复BUG和维护的时候增大了难度。

3.MapReduce V2

  在Hadoop V2中,加入了YARN的概念,所以MapReduce V2的架构和MapReduce V1的架构有些许的变化,如下图所示:

  从上图中,我们可以清晰的看出,架构重构的基本思想在于将JobTracker的两个核心的功能单独分离成独立的组件了。分离后的组件分别为资源管理(Applications Manager)和任务调度器(Resource Scheduler)。新的资源管理器(Resource Manager)管理整个系统的资源分配,而每一个Node Manager下的App Master(Application Master)负责对应的调度和协调工作,而在实际中,App Master从Resource Manager上获得资源,让Node Manager来协同工作和任务监控。

  从图中我们可以看出,Resource Manager是支持队列分层的,这些队列可以从集群中获取一定比例的资源,也就是说Resource Manager可以算得上是一个调度器,它在执行的过程当中本身不负责对应用的监控和状态的定位跟踪。

  Resource Manager在内存,CPU,IO等方面是动态分配的,相比第一代MapReduce计算框架,在资源使用上大大的加强了资源使用的灵活性。上图中的Node Manager是一个代理框架,负责应用程序的执行,监控应用程序的资源利用率,并将信息上报给资源管理器。另外,App Master所担当的角色职责包含:在运行任务是,向任务调度器动态的申请资源,对应用程序的状态进行监控,处理异常情况,如若出现问题,会在其他节点进行重启。

4.MR V1和MR V2的区别

  在和大家分析完 MR V1 和 MR V2 的架构后,我们来看看二者有哪些变化。在MR V2版本中,大部分的API接口都是兼容的保留下来,MR V1中的JobTracker和TaskTracker被替换成相应的Resource Manager,Node Manager。对比于MR V1中的Task的监控,重启等内热都交由App Master来处理,Resource Manager提供中心服务,负责资源的分配与调度。Node Manager负责维护Container的状态,并将收集的信息上报给Resource Manager,以及负责和Resource Manager维持心跳。

  MR V2中加入Yarn的概念后,体现以下设计优点:

  • 减少来资源消耗,让监控每一个作业更加分布式了。
  • 能够支持更多的变成模型,如:Spark,Storm,以及其他待开发的编程模型。
  • 将资源以内存量的概念来描述,比MR V1中的slot更加合理。

  另外,在工程目录结构也有了些许的变化,如下表所示:

改变目录 MR V1 MR V2 描述
配置文件 ${HADOOP_HOME}/conf ${HADOOP_HOME}/etc/hadoop MR V2中的配置文件路径修改为etc/hadoop目录下
脚本 ${HADOOP_HOME}/bin ${HADOOP_HOME}/sbin和${HADOOP_HOME}/bin 在MR V2中启动,停止等命令都位于sbin目录下,操作hdfs的命令存放在bin目录下
JAVA_HOME ${HADOOP_HOME}/conf/hadoop-env.sh ${HADOOP_HOME}/etc/hadoop/hadoop-env.sh和${HADOOP_HOME}/etc/hadoop/yarn-env.sh 在MR V2中需要同时在hadoop-env.sh和yarn-env.sh中配置JDK的路径

  由于添加Yarn特性,与第一代MR的框架变化较大,第一代的核心配置文件许多项也在新框架中摒弃了,具体新框架的核心配置文件信息,请参考《配置高可用的Hadoop平台》。

5.MR V2的重构思路

  在V2中的MapReduce重构的思路主要有以下几点:

  • 层次化的管理:分层级对资源的调度和分配进行管理。
  • 资源管理方式:由第一代的slot作为资源单位元,调整为更加细粒的内存单位元。
  • 编程模型拓展:V2版的设计支持除MapReduce以外的编程模型。

  MapReduce:WordCount V2,代码如下:

复制代码
package cn.hdfs.mapreduce.example;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;/**
* @date Apr 17, 2015
*
* @author dengjie
*/
public class WordCount2 {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable>{static enum CountersEnum { INPUT_WORDS }private final static IntWritable one = new IntWritable(1);private Text word = new Text();private boolean caseSensitive;private Set<String> patternsToSkip = new HashSet<String>();private Configuration conf;private BufferedReader fis;@Overridepublic void setup(Context context) throws IOException,InterruptedException {conf = context.getConfiguration();caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);if (conf.getBoolean("wordcount.skip.patterns", true)) {URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();for (URI patternsURI : patternsURIs) {Path patternsPath = new Path(patternsURI.getPath());String patternsFileName = patternsPath.getName().toString();parseSkipFile(patternsFileName);}}}private void parseSkipFile(String fileName) {try {fis = new BufferedReader(new FileReader(fileName));String pattern = null;while ((pattern = fis.readLine()) != null) {patternsToSkip.add(pattern);}} catch (IOException ioe) {System.err.println("Caught exception while parsing the cached file '"+ StringUtils.stringifyException(ioe));}}@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String line = (caseSensitive) ?value.toString() : value.toString().toLowerCase();for (String pattern : patternsToSkip) {line = line.replaceAll(pattern, "");}StringTokenizer itr = new StringTokenizer(line);while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);Counter counter = context.getCounter(CountersEnum.class.getName(),CountersEnum.INPUT_WORDS.toString());counter.increment(1);}}}public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);String[] remainingArgs = optionParser.getRemainingArgs();if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount2.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);List<String> otherArgs = new ArrayList<String>();for (int i=0; i < remainingArgs.length; ++i) {if ("-skip".equals(remainingArgs[i])) {job.addCacheFile(new Path(remainingArgs[++i]).toUri());job.getConfiguration().setBoolean("wordcount.skip.patterns", true);} else {otherArgs.add(remainingArgs[i]);}}FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
复制代码

  Spark:WordCount,代码如下:

复制代码
package com.hdfs.spark.example/**
* @date Apr 17, 2015
*
* @author dengjie
*/
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._/**
* 统计字符出现次数
*/
object WordCount {def main(args: Array[String]) {if (args.length < 1) {System.err.println("Usage: <file>")System.exit(1)}val conf = new SparkConf()val sc = new SparkContext(conf)     val line = sc.textFile(args(0))      line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println)      sc.stop()   
} 
}
复制代码

这篇关于Hadoop2源码分析-MapReduce篇的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007487

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。