MapReduce简介,结构组成,运行过程,WordCount...

2024-05-13 12:38

本文主要是介绍MapReduce简介,结构组成,运行过程,WordCount...,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

hadoop四大模块
-------------------
common
hdfs        //namenode + datanode + secondarynamenode
mapred

yarn        //resourcemanager + nodemanager

1.MapReduce简介

Hadoop最主要的两部分Hdfs,MapReduce。其中Hdfs是用来分布式存储数据。在面对着由Hdfs存储的巨大数据量时,设计面向大数据并行处理的计算模型势在必得。

  MapReduce最早是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法。Google公司设计MapReduce的初衷主要是为了解决其搜索引擎中大规模网页数据的并行化处理。Google公司发明了MapReduce之后首先用其重新改写了其搜索引擎中的Web文档索引处理系统。但由于MapReduce可以普遍应用于很多大规模数据的计算问题,因此自发明MapReduce以后,Google公司内部进一步将其广泛应用于很多大规模数据处理问题。到目前为止,Google公司内有上万个各种不同的算法问题和程序都使用MapReduce进行处理。

 

2003年和2004年,Google公司在国际会议上分别发表了两篇关于Google分布式文件系统和MapReduce的论文,公布了Google的GFS和MapReduce的基本原理和主要设计思想。
Hadoop的思想来源于Google的几篇论文,Google的那篇MapReduce论文里说:Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages。这句话提到了MapReduce思想的渊源,大致意思是,MapReduce的灵感来源于函数式语言(比如Lisp)中的内置函数map和reduce。函数式语言也算是阳春白雪了,离我们普通开发者总是很远。简单来说,在函数式语言里,map表示对一个列表(List)中的每个元素做计算,reduce表示对一个列表中的每个元素做迭代计算。它们具体的计算是通过传入的函数来实现的,map和reduce提供的是计算的框架。不过从这样的解释到现实中的MapReduce还太远,仍然需要一个跳跃。再仔细看,reduce既然能做迭代计算,那就表示列表中的元素是相关的,比如我想对列表中的所有元素做相加求和,那么列表中至少都应该是数值吧。而map是对列表中每个元素做单独处理的,这表示列表中可以是杂乱无章的数据。这样看来,就有点联系了。在MapReduce里,Map处理的是原始数据,自然是杂乱无章的,每条数据之间互相没有关系;到了Reduce阶段,数据是以key后面跟着若干个value来组织的,这些value有相关性,至少它们都在一个key下面,于是就符合函数式语言里map和reduce的基本思想了。 [2]
这样我们就可以把MapReduce理解为,把一堆杂乱无章的数据按照某种特征归纳起来,然后处理并得到最后的结果。Map面对的是杂乱无章的互不相关的数据,它解析每个数据,从中提取出key和value,也就是提取了数据的特征。经过MapReduce的Shuffle阶段之后,在Reduce阶段看到的都是已经归纳好的数据了,在此基础上我们可以做进一步的处理以便得到结果。这就回到了最初,终于知道MapReduce为何要这样设计。 [2]
2004年,开源项目Lucene(搜索索引程序库)和Nutch(搜索引擎)的创始人Doug Cutting发现MapReduce正是其所需要的解决大规模Web数据处理的重要技术,因而模仿Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架和系统。自此,Hadoop成为Apache开源组织下最重要的项目,自其推出后很快得到了全球学术界和工业界的普遍关注,并得到推广和普及应用。
MapReduce的推出给大数据并行处理带来了巨大的革命性影响,使其已经成为事实上的大数据处理的工业标准。尽管MapReduce还有很多局限性,但人们普遍公认,MapReduce是到目前为止最为成功、最广为接受和最易于使用的大数据并行处理技术。MapReduce的发展普及和带来的巨大影响远远超出了发明者和开源社区当初的意料,以至于马里兰大学教授、2010年出版的《Data-Intensive Text Processing with MapReduce》一书的作者Jimmy Lin在书中提出:MapReduce改变了我们组织大规模计算的方式,它代表了第一个有别于冯·诺依曼结构的计算模型,是在集群规模而非单个机器上组织大规模计算的新的抽象模型上的第一个重大突破,是到目前为止所见到的最为成功的基于大规模计算资源的计算模型。

2.MapReduce结构组成

MapReduce由两个阶段组成:Map阶段和Reducer阶段。用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。其中这两个函数的形参是key,value对,表示函数的输入信息。

Map与Reducer各有两对key,value对,分别是输入key,value对,以及输出key,value对。

其中Map的输入key,value对是读取数据存储之后生成的,输出key,value对则是对读取的数据进行处理产生的。

Reducer的输入key,value对是Map的输出key,value对,输出key,value对则是最终要的结果。

3.MapReduce运行过程

在面对巨大的数据存储时,如何来设计计算模型呢?MapReduce计算模型采用的是分而治之的原理。如下图所示:

                  

 

对于求1+5+7+3+4+9+3+5+6的时候,MapReduce使用切片的形式,将数据切分成3(1+5+7,3+4+9,3+5=6)份,然后分别在不同的Map节点上同时进行运算,这样对大数据处理而言速度效率机会快很多,最后把各个节点上算出的数据13,16,14在Reduce进行最后的整合,得出最终的结果。

那么数据在Map端与Reducer端是怎样传输的,以及传输过程中都发生了些什么呢?让我们来看一下:

 

 

假设数据如上图所示。

第一步,对于存储在Hdfs上的数据,首先会使用InputFormat——一个抽象类,其实现类包括文本文件实现类:TextInputFormat(默认输入实现类),序列文件输入类:SequenceFileInputFormat等。进行输入。

第二步,为了提高运算效率,对巨大数据进行数据切片,并行计算。切片的计算方法是对于一个文件(最小切片minSize大小是1,最大切片minSplit大小是Long.Max,数据块大小blockSize)取3者中的中间值。

第三步,进行数据切割之后获取切片的数量后开启Map过程,有多少个切片,就有多少个Map。Map过程对输入的<K,V>对进行处理,然后输出<K,V>对。

第四步,在Map阶段,对输出的<K,V>对进行分区,目的是把当前输出的<K,V>划分到哪一个Reducer中,默认的分区函数是对K进行Hash。

第五步,分区之后的<K,V>对会保存到内存缓冲区,当数据量达到缓冲区的百分之80之后,缓冲区会开启一个线程,将数据写入到磁盘的一个临时文件中,在写入之前,对分区内的数据进行排序,排序的原则是按照字典顺序进行排序,并且把相同的Key的进行归并,例如把<hello,1>,<hello,2>,<hello,1>归并成<hello,{1,2,1}>

第六步,多个Map任务的输出,将进行排序并且归并。例如Map1中的1号分区<hello,{1,2,1}>与Map2中1号分区<hello,{1,1,1}>归并成<hello,{1,2,1,1,1,1}>。

第七步,每个Reducer(默认是1个,也可以自行进行配置)通过网络的拷贝,将自己对应的分区数据拷贝到自己的节点,然后进行业务逻辑处理,最后输出。

4.WordCount实例

根据上面的流程分析,我们来写一个经典的MapReduce案例——WordCount,统计文件中的单词数量。不要小看这个案例,百度中的热词搜索与其原理有点类似,统计出每日搜索最多的词。

数据文件内容如下

 

 

Map类

 

 

package com.hadoop.mr;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.mapreduce.Mapper;

 

import java.io.IOException;

 

/**

* @author zpx

* @Title: ${file_name}

* @Package ${package_name}

* @Description: WordCount的Mapper

* @date 2018/6/2811:13

*/

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        Text keyOut=new Text();

        IntWritable valueOut=new IntWritable();

        String[] arr = value.toString().split(" ");

        for(String s:arr){

           keyOut.set(s);

           valueOut.set(1);

            context.write(keyOut,valueOut);

        }

    }

}

Reduce类

 

 

package com.hadoop.mr;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

 

/**

* @author zpx

* @Title: ${file_name}

* @Package ${package_name}

* @Description: WordCount的Reduce

* @date 2018/6/2811:24

*/

public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

    @Override

    /**

      * @Description:一个分组只进入一次reduce

      * @param ${tags}

      * @return ${return_type}

      * @throws

      * @author zpx

      * @date 2018/6/30 0:15

      */

    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int count=0;

        for(IntWritable v:values){

            count+=v.get();

        }

        context.write(key,new IntWritable(count));

    }

}

主函数类

 

 

package com.hadoop.mr;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

import java.io.IOException;

 

/**

* @author zpx

* @Title: ${file_name}

* @Package ${package_name}

* @Description: WordCount主函数

* @date 2018/6/2811:29

*/

public class WCApp{

    public static void main(String[] args) throws Exception {

        //在程序中指定hadoop.home.dir

//        System.setProperty("hadoop.home.dir","F://hadoop-dir" );

        Configuration conf=new Configuration();

        Job job=Job.getInstance(conf);

//        conf.set("fs.defaultFS", "file:///");//本地测试请将注释取消掉

        //设置job的各种属性

        job.setJobName("WCApp");                        //作业名称

        job.setJarByClass(WCApp.class);                 //搜索类

        job.setInputFormatClass(TextInputFormat.class); //设置输入文件格式

        //添加输入路径

        FileInputFormat.addInputPath(job,new Path(args[0]));

        //设置输出路径

        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //设置最大切片数

        //FileInputFormat.setMaxInputSplitSize(job,13);

        //最小切片数

        //FileInputFormat.setMinInputSplitSize(job,1L);

 

        //设置分区类

//        job.setPartitionerClass(MyPartitioner.class);   //设置自定义分区

 

        //设置合成类

//        job.setCombinerClass(WCReducer.class);          //设置combiner类

        job.setMapperClass(WCMapper.class);             //mapper类

        job.setReducerClass(WCReducer.class);           //reducer类

        //job.setNumReduceTasks(3);                       //reduce个数

        //设置Map的输入输出

        job.setMapOutputKeyClass(Text.class);           //

        job.setMapOutputValueClass(IntWritable.class);  //

        //设置Reducer的输入数据

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);     //

        job.waitForCompletion(true);

 

    }

 

将当前工程打成JAR包,然后放在集群上进行测试,命令:

hadoop jar mr.jar com.hadoop.mr.WCApp hdfs://s10/user/zpx/wc/data hdfs://s10/user/zpx/wc/out

查看结果

 

 

 

 

 

 

过程分析:使用MapReduce默认的数据输入格式TextInputFormat.class,对数据切片读取的过程中,输入Key是文件的偏移量(即每一行开始位置),Value是一整行数据,所以在Map类中要对数据进行切割,然后输出Key,Value。格式是<hello,1>等。进行了排序归并之后输入到Reduce端的K,V对格式是<hello,{1,1,1,1,1,1,1,1,1,1,1,1,1,1}>,<tom,{1,1,1,1,1,1,1}>,<tom0,{1}>,<tom0,{1}>,<tom1,{1}>,<tom2,{1,1,1}>,<tom3,{1}>,<tom4,{1}>,Reduce端只需要对相应Key的数据集合做迭代运算就可以了。最后生成Key,Value对输出即可。由于在此过程中要数据要在网络中传输,所以要对数据进行序列化,而Hadoop有自己的一套序列化标准(未使用Java的序列化,因为其麻烦,复杂),所以我们看到的Key,Value是Hadoop自己的序列化,Long->LongWritable,Int->IntWritable,String->Text

 

这篇关于MapReduce简介,结构组成,运行过程,WordCount...的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/985731

相关文章

Golang的CSP模型简介(最新推荐)

《Golang的CSP模型简介(最新推荐)》Golang采用了CSP(CommunicatingSequentialProcesses,通信顺序进程)并发模型,通过goroutine和channe... 目录前言一、介绍1. 什么是 CSP 模型2. Goroutine3. Channel4. Channe

SpringBoot 整合 Grizzly的过程

《SpringBoot整合Grizzly的过程》Grizzly是一个高性能的、异步的、非阻塞的HTTP服务器框架,它可以与SpringBoot一起提供比传统的Tomcat或Jet... 目录为什么选择 Grizzly?Spring Boot + Grizzly 整合的优势添加依赖自定义 Grizzly 作为

Java中的Opencv简介与开发环境部署方法

《Java中的Opencv简介与开发环境部署方法》OpenCV是一个开源的计算机视觉和图像处理库,提供了丰富的图像处理算法和工具,它支持多种图像处理和计算机视觉算法,可以用于物体识别与跟踪、图像分割与... 目录1.Opencv简介Opencv的应用2.Java使用OpenCV进行图像操作opencv安装j

mysql-8.0.30压缩包版安装和配置MySQL环境过程

《mysql-8.0.30压缩包版安装和配置MySQL环境过程》该文章介绍了如何在Windows系统中下载、安装和配置MySQL数据库,包括下载地址、解压文件、创建和配置my.ini文件、设置环境变量... 目录压缩包安装配置下载配置环境变量下载和初始化总结压缩包安装配置下载下载地址:https://d

springboot整合gateway的详细过程

《springboot整合gateway的详细过程》本文介绍了如何配置和使用SpringCloudGateway构建一个API网关,通过实例代码介绍了springboot整合gateway的过程,需要... 目录1. 添加依赖2. 配置网关路由3. 启用Eureka客户端(可选)4. 创建主应用类5. 自定

最新版IDEA配置 Tomcat的详细过程

《最新版IDEA配置Tomcat的详细过程》本文介绍如何在IDEA中配置Tomcat服务器,并创建Web项目,首先检查Tomcat是否安装完成,然后在IDEA中创建Web项目并添加Web结构,接着,... 目录配置tomcat第一步,先给项目添加Web结构查看端口号配置tomcat    先检查自己的to

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Linux使用nohup命令在后台运行脚本

《Linux使用nohup命令在后台运行脚本》在Linux或类Unix系统中,后台运行脚本是一项非常实用的技能,尤其适用于需要长时间运行的任务或服务,本文我们来看看如何使用nohup命令在后台... 目录nohup 命令简介基本用法输出重定向& 符号的作用后台进程的特点注意事项实际应用场景长时间运行的任务服

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

SpringBoot集成SOL链的详细过程

《SpringBoot集成SOL链的详细过程》Solanaj是一个用于与Solana区块链交互的Java库,它为Java开发者提供了一套功能丰富的API,使得在Java环境中可以轻松构建与Solana... 目录一、什么是solanaj?二、Pom依赖三、主要类3.1 RpcClient3.2 Public