MapReduce (Shuffle,partition,combiner,Spill )

2024-05-08 19:38

本文主要是介绍MapReduce (Shuffle,partition,combiner,Spill ),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、shuffle介绍

1 、shuffle就是洗牌弄乱的意思,shuffle代表map 输出 到reduce 的整个过程,他解决的问题就是如何将多个map task的输出,作为多个reduce task 的输入,下面就来看看shffler是如何对map的输出结果排序,处理,分组成reduce的输入的。

2 、shuffle和partition和combiner的关系是包含关系,shuffle过程包含partition

二、shuffle的整个流程

2.1 介绍

1 shuffle整个过程可以分两部分,一部分是在map端,一部分是在reduce端

2 当我们跑mapreduce时候,maptask和reducetask大部分情况是分布在不同的节点上的,这时候reducetask执行需要通过网络去copy其他节点上的maptask任务的结果,这对网络的要求是非常大的,所以我们期望shuffle过程目标是

1 将map的输出完整的传输达到reduce端

2 尽可能减少传输到来消耗

3 降低磁盘I/O

2.2 shuffle过程

Map端的 Shuffle 阶段

2.2.1输入数据

          数据分片(默认128M,HDFS一个块的大小),MAP处理(文章中每个词出现一次记录一次,key:词,value:1),MAP输出会先输出到内存上。

2.2.2partition函数

          在写入缓存前,会调用partition函数,partition会根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理分组,最后写到硬盘上。(partition过程后面会详细说)

2.2.3 Spill 阶段

            每个 Map 任务都有一个用来写入输出数据的循环内存缓冲区(默认大小为 100MB),当缓冲区中的数据量达到一个特定阈值(默认是 80%)时,系统将会启动一个后台线程,把缓冲 区中的内容写到磁盘中(即 Spill 阶段)。在写磁盘过程中,Map 输出继续被写到缓冲区中,但如果在此期间缓冲区被填满,那么 Map 任务就会阻塞直到写磁盘过程完成。

2.2.3combiner 操作

          可有可无,combiner 是一种特殊的reduce,他的作用在于优化map和reduce之间的数据传输数量,比如查文章中单词出现的次数,mao执行完了都是一个词,编辑标记一个1,3combiner 操作可以把同样的词进行累加,map输出数据根据分区排序完成后,在写入文件之前会执行一次combine操作(前提是作业中设置了这个操作);

2.2.5 Sort 阶段

          多路归并排序 虽然已经partition函数进行分组,但是还需要对输出内容进行排序,方便后面处理

2.2.5 copy阶段

          map任务执行完成, 写入文件结束后Reduce任务就会来复制任务需要的数据,在 TaskTracker 的本地磁盘上的数据会被多个Reduce任务复制(是哪个任务就数据就复制哪一块,partition任务已经分好

Reduce 端的 Shuffle 阶段

2.2.7 Merge 阶段

        将 Map 端复制过来的数据先放入内存缓冲区中,Merge 有 3 种形式,分别是内存到内存,内存到磁盘,磁盘到磁盘。默认情况下,第一种形式不启用,第二种形式一直在运行(Spill 阶段),直到结束,第三种形式生成最终的文件(Merge 阶段)。

2.2.8 Reduce 阶段

        最终文件可能存在于磁盘中,也可能存在于内存中,但是默认情况下是位于磁盘中的。当 Reduce 的输入文件已定,整个 Shuffle 阶段就结束了,然后就是 Reduce 执行,把结果放到 HDFS 中(Reduce 阶段)。

2.3 partition过程

2.2.2已经简单介绍了,partition其实就确认数据应该被分到哪个reduce上,那partition是怎么分的呢?默认的算法which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks

key.hashCode() & Integer.MAX_VALUE    &运算避免出现负值,然后对结果进行取余。

当然也可以extends Partitioner自定义partition函数

2.4combiner 过程

为了减少优化网络传输,在写入文件之前会执行一次combine操作,打个比方,统计文章单词出现的次数。Map 执行完了之后是每个词记录一次,例如:key: hi   value:1   ,这时候hi这个次如果出现多次占用空间就比较大,combine操作可以汇总这些词的词数 ,最后如果有100个hi ,汇总后就是key: hi   value:100 大大减少了网络的传输。

 

这篇关于MapReduce (Shuffle,partition,combiner,Spill )的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/971253

相关文章

MapReduce程序设计2

要求 1、数据集stock-daily,包含A股近4000只股票的今年以来的日数据;数据集stock-daily-30d仅包含最近30个交易日数据,根据自己计算机性能选择。 数据来源:https://www.joinquant.com/help/api/help?name=JQData 2、数据集stock-concept,包含A股近4000只股票所有的股票代码、名称和概念。 数据来源:万

MapReduce 实践题:Web 访问日志分析与异常检测

文章目录 作业描述MapReduce 实践题:Web 访问日志分析与异常检测题目背景数据集说明任务要求输入数据示例输出数据示例实现步骤 解题思路1. 数据预处理2. 访问统计3. 异常检测4. 主方法5. 结果输出 作业描述 MapReduce 实践题:Web 访问日志分析与异常检测 题目背景 你被要求设计和实现一个基于 MapReduce 的大规模 Web 访问日志分析

count(distinct ...) over (partition by...) 替换成mysql

你这个是用了 Oracle 的分析函数。 SQL Server 是不支持的。如果语句比较简单的。例如SELECT COUNT( distinct A) OVER ( partition by B) FROM C可以修改为:SELECT COUNT( distinct A) FROM CGROUP BY B但是如果你的逻辑很复杂的话,那就麻烦了。

计算质数通过分区(Partition)提高Spark的运行性能

在Sortable公司,很多数据处理的工作都是使用Spark完成的。在使用Spark的过程中他们发现了一个能够提高Spark job性能的一个技巧,也就是修改数据的分区数,本文将举个例子并详细地介绍如何做到的。 查找质数   比如我们需要从2到2000000之间寻找所有的质数。我们很自然地会想到先找到所有的非质数,剩下的所有数字就是我们要找的质数。   我们首先遍历2到2000000之间的每个数

MapReduce入门理解

两张图反复的看对比,然后结合网上的 例子  ,相信你会理解 map reduce 这种编程“模型”  模型这个词用的很精辟。

在MapReduce中利用MultipleOutputs输出多个文件

最近在学习Hadoop,由于用到要将reduce结果输出到多个文档中,所以研究了一下MultipleOutputs用法,在这里总结一下。 首先我用到的例子是将原始数据按国家分类成不同的文档,数据是从网上拷贝下来的,如下: 18.217.167.70 United States206.96.54.107 United States196.109.151.139 Mauritius1

tf.train.batch和tf.train.shuffle_batch的理解

capacity是队列的长度 min_after_dequeue是出队后,队列至少剩下min_after_dequeue个数据 假设现在有个test.tfrecord文件,里面按从小到大顺序存放整数0~100 1. tf.train.batch是按顺序读取数据,队列中的数据始终是一个有序的队列, 比如队列的capacity=20,开始队列内容为0,1,..,19=>读取10条记录后,队列剩下10,

Hadoop3:MapReduce中Reduce阶段自定义OutputFormat逻辑

一、情景描述 我们知道,在MapTask阶段开始时,需要InputFormat来读取数据 而在ReduceTask阶段结束时,将处理完成的数据,输出到磁盘,此时就要用到OutputFormat 在之前的程序中,我们都没有设置过这部分配置 所以,采用的是默认输出格式:TextOutputFormat 在实际工作中,我们的输出不一定是到磁盘,可能是输出到MySQL、HBase等 那么,如何实现

Ubuntu14.04安装 error:no such partition

参考: Ubuntu 开机出现 grub rescue> 终端模式修复方法:http://www.linuxidc.com/Linux/2012-07/65249.htm Windows&Ubuntu双系统一键Ghost,提示grub rescue的解决方法:http://www.linuxidc.com/Linux/2015-01/111189.htm #############

Hadoop3:MapReduce中实现自定义排序

一、场景描述 以统计号码的流量案例为基础,进行开发。 流量统计结果 我们现在要对这个数据的总流量进行自定义排序。 二、代码实现 我们要对总流量进行排序,就是对FlowBean中的sumFlow字段进行排序。 所以,我们需要让FlowBean实现WritableComparable接口,并重写compareTo方法。 另外,我们知道,排序是在Shuffle过程进行的,且是在环形缓冲区进行