用于大规模数据集(大于1TB)的并行运算的MapReduce是怎么实现的?

本文主要是介绍用于大规模数据集(大于1TB)的并行运算的MapReduce是怎么实现的?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

MapReduce 是一种编程模型,用于处理和生成大数据集。MapReduce 分为两个阶段:Map 阶段和 Reduce 阶段。

  1. Map 阶段:在这个阶段,输入数据被拆分成不同的数据块,这些数据块被分发到各个 Map 任务上。每个 Map 任务对输入的数据块进行处理并输出成一组键值对。具体的处理方式取决于编写 Map 任务的方式,这将由程序员决定。

  2. Shuffle 阶段:在 Map 阶段之后,系统会根据 Map 阶段输出的键值对进行排序和分区。这个过程是系统自动完成的,不需要程序员编写代码。

  3. Reduce 阶段:在这个阶段,一组键值对会送到同一个 Reduce 任务上。Reduce 任务会对这些键值对按照键来进行合并,并对每个键的所有值进行处理,输出处理结果。具体的处理方式也是由程序员编写的 Reduce 任务来决定。

通过 Map 和 Reduce 两个步骤,MapReduce 能够将大规模的数据计算工作分解成小规模的计算任务,这些小规模的计算任务可以在不同的计算节点上并行处理。这使得 MapReduce 可以处理非常大规模的数据。

例如,一个简单的MapReduce程序可能会将文本文件分成单词(map),然后计算每个单词的出现次数(reduce)。在这种情况下,Map任务将文本文件切分为单词,并输出每个单词的键值对(键是单词,值是1),然后Reduce任务将这些键值对按照键(单词)合并,并计算每个键(单词)的值(出现次数)的总和。

在真实的环境中,这个模型通常由分布式计算框架(如 Hadoop)来实现,框架会负责任务的调度、数据的分布和容错等工作,使得程序员可以专注于编写处理数据的 Map 和 Reduce 函数。

上demo:

// 导入 IOException 类,用于处理输入/输出错误
import java.io.IOException;// 导入 StringTokenizer 类,用于解析字符串
import java.util.StringTokenizer;// 导入 Hadoop 提供的 Configuration 类,用于 Hadoop 配置设置
import org.apache.hadoop.conf.Configuration;// 导入 Hadoop 提供的 Path 类,处理 Hadoop 文件系统路径
import org.apache.hadoop.fs.Path;// 导入 Hadoop 提供的 IntWritable 和 Text 类,数据类型转换
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;// 导入 Hadoop MapReduce Job 类,用于定义和提交 MapReduce 任务
import org.apache.hadoop.mapreduce.Job;// 从 Hadoop MapReduce 库导入 Mapper 和 Reducer 类
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;// 导入 Hadoop IO 类库,用于 MapReduce 输入输出
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;// 定义 WordCount 类
public class WordCount {// 定义公开的静态 TokenizerMapper 类,继承 Hadoop 的 Mapper 类public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{// 定义一个值为 1 的整型变量,作为每个单词计数的值private final static IntWritable one = new IntWritable(1);// 定义一个 Text 类型的变量,存储每个即将被计数的单词private Text word = new Text();// 重写 map 方法,这个方法会被 MapReduce 框架调用,每读入一行数据调用一次public void map(Object key, Text value, Context context) throws IOException, InterruptedException {// 使用 StringTokenizer 将每行的字符串拆解为单个的单词StringTokenizer itr = new StringTokenizer(value.toString());// 循环单词字符串while (itr.hasMoreTokens()) {// 为 word 对象设值word.set(itr.nextToken());// 将每个单词及其数量(默认为 1)输出context.write(word, one);}}}// 定义公开的静态 IntSumReducer 类,继承 Hadoop 的 Reducer 类public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {// 定义 IntWritable 类型的对象,存储每个单词的计数总和private IntWritable result = new IntWritable();// 重写 reduce 方法,这个方法用于汇总每个单词的数量public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 定义 sum 变量,用于累加每个单词的数量int sum = 0;// 使用 for 循环,遍历当前单词的所有数量值,进行累加for (IntWritable val : values) {sum += val.get();}// 为结果设值result.set(sum);// 将每个单词及其最终计数总和写出context.write(key, result);}}// 定义主方法,运行 MapReduce 任务的入口public static void main(String[] args) throws Exception {// 创建 Hadoop 配置对象Configuration conf = new Configuration();// 创建 Job 实例,设置 Job 名称Job job = Job.getInstance(conf, "word count");// 设置 Jar 文件 这行代码的作用是设置该作业的 jar 文件。在 Hadoop 中执行一个 MapReduce 作业,需要将其打包为一个 jar 文件并上传到 Hadoop 集群中。这里的 WordCount.class 即是你的作业驱动类, Hadoop 将会通过这个类查找包含该类的 jar 文件,然后使用这个 jar 文件执行整个作业。这样设置可以确保作业在 Hadoop 集群中的每个节点上都可以正确执行。job.setJarByClass(WordCount.class);// 设置 Mapper 类 这行代码的作用是设置 MapReduce 作业的 Mapper 类。在 MapReduce 模型中,Mapper 是负责处理输入数据,将输入数据转换为一系列键值对的组件。TokenizerMapper.class 就是你写的处理数据的类,你可以在这个类中定义如何处理和映射输入数据。设置 Mapper 类是进行 MapReduce 作业的关键步骤之一。job.setMapperClass(TokenizerMapper.class);// 设置 Combiner 类  当一个特定的 Mapper 结束处理之后,它会输出很多记录,这些记录中可能有很多键是相同的。如果没有 Combiner,这些记录就会直接传递给 Reducer,这样可能会产生大量的网络 I/O,显著地影响性能。//而有了 Combiner,我们可以在每个 Mapper 所在的节点上先进行一次局部的归约操作,将相同键的值合并后再传输到 Reducer,这样可以大大减少需要传输的数据量,提高任务的运行性能。job.setCombinerClass(IntSumReducer.class);// 设置 Reducer 类及其输出键-值对的类型// 这行代码是设定了该 MapReduce 任务的 Reducer 类是 IntSumReducer。Reducer 类是对所有 Mapper 的输出进行汇总的部分,每一个 MapReduce 任务都需要设定一个 Reducer 类。这里的 IntSumReducer 类定义了如何对数据进行汇总操作。job.setReducerClass(IntSumReducer.class); //这行代码是设定了该 MapReduce 任务输出数据的键的类型是 Text。在 MapReduce 中,数据是以键值对的形式存储和传输的。这行代码指明了输出键的数据类型,确保处理结果的格式正确。job.setOutputKeyClass(Text.class);//这行代码是设定了该 MapReduce 任务输出数据的值的类型是 IntWritable。这是对job.setOutputKeyClass(Text.class)的补充,指明了输出值的数据类型。job.setOutputValueClass(IntWritable.class);// 设置输入数据的路径,取自程序参数FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输出数据的路径,取自程序参数FileOutputFormat.setOutputPath(job, new Path(args[1]));// 等待任务完成后退出程序 这行代码是 MapReduce 任务的最后一步,作用是让主线程等待这个 MapReduce 任务完成。//在这段代码中,waitForCompletion方法将主线程挂起,直到 MapReduce 任务完成为止。这里的参数 true 表示会在控制台上显示这个任务的进度,也就是说你运行这个程序的时候,会在控制台上看到任务的完成进度。//这段代码的含义就是:如果任务成功完成,那么程序正常退出;如果任务执行过程中出现了错误,那么程序异常退出。System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
/**
MapReduce 模型的并发性体现在其 Map 和 Reduce 操作的实现中,这两个操作对数据进行分布式处理。在以上代码片段中,没有一行明确地标示了并发操作,因为并发性质是 MapReduce 框架内部管理的,并非是在业务代码中直接表现出来的。
然而,job.setMapperClass(TokenizerMapper.class); 和 job.setReducerClass(IntSumReducer.class); 
这两行设置了 Mapper 类和 Reducer 类,这两类的实例在运行时会在多个节点上同时处理数据(一个节点处理输入数据的一部分),这便是 MapReduce 的并发性!!!
具体来说:
在 Mapper 阶段,输入的数据集会被划分成多个数据块,每一个数据块会被一个 Mapper 实例负责处理,这些 Mapper 实例是在集群的各个计算节点上同时运行的。
Reducer 阶段同样具有并发性,不同的 Reducer 实例会并发地处理 Mapper 输出的数据集中具有相同键的数据子集。
请注意,这些并发处理是由 Hadoop 框架自动管理的,并不需要在业务代码中明确地进行并发控制,这也是使用这种大数据处理框架的优点之一。
*/

这个程序把一个任务分为两个部分:

  1. Mapper (TokenizerMapper class):接受一行文本,拆分成单词,然后返回一系列的<单词,1>这样的键值对。
  2. Reducer (IntSumReducer class):对所有相同的单词计数总和,并返回<单词,总计数>这样的键值对。

执行这个程序,需要有一个 Hadoop 环境并且把待统计名词的文本文件放到应当的路径,并将结果输出的路径设置为期望的路径进行查看结果。

请注意,这些并发处理是由 Hadoop 框架自动管理的,并不需要在业务代码中明确地进行并发控制,这也是使用这种大数据处理框架的优点之一。

这篇关于用于大规模数据集(大于1TB)的并行运算的MapReduce是怎么实现的?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/889036

相关文章

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

Python脚本实现自动删除C盘临时文件夹

《Python脚本实现自动删除C盘临时文件夹》在日常使用电脑的过程中,临时文件夹往往会积累大量的无用数据,占用宝贵的磁盘空间,下面我们就来看看Python如何通过脚本实现自动删除C盘临时文件夹吧... 目录一、准备工作二、python脚本编写三、脚本解析四、运行脚本五、案例演示六、注意事项七、总结在日常使用

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat