hadooper-Hadoop中的各种排序

2024-04-14 19:48
文章标签 排序 hadoop hadooper

本文主要是介绍hadooper-Hadoop中的各种排序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1:shuffle阶段的排序(部分排序)

shuffle阶段的排序可以理解成两部分,一个是对spill进行分区时,由于一个分区包含多个key值,所以要对分区内的<key,value>按照key进行排序,即key值相同的一串<key,value>存放在一起,这样一个partition内按照key值整体有序了。

第二部分并不是排序,而是进行merge,merge有两次,一次是map端将多个spill 按照分区和分区内的key进行merge,形成一个大的文件。第二次merge是在reduce端,进入同一个reduce的多个map的输出 merge在一起,该merge理解起来有点复杂,最终不是形成一个大文件,而且期间数据在内存和磁盘上都有,关于这点金子准备日后单独整理一下。

所以shuffle阶段的merge并不是严格的排序意义,只是将多个整体有序的文件merge成一个大的文件,由于同的task执行,map的输出会有所不同,所以merge后的结果不是每次都相同,不过还是严格要求按照分区划分,同时每个分区内的具有相同key的<key,value>对挨在一起。


shuffle排序综述:如果只定义了map函数,没有定义reduce函数,那么输入数据经过shuffle的排序后,结果为key值相同的输出挨在一起,且key值小的一定在前面,这样整体来看key值有序(宏观意义的,不一定是按从大到小,因为如果采用默认的HashPartitioner,则key 的hash值相等的在一个分区,如果key为IntWritable的话,每个分区内的key会排序好的),而每个key对应的value不是有序的。

应用一:金子理解:shuffle的排序随不能满足全局排序,但是实际中还是帮助我们做了很多工作,比如我们只希望把<key,value>对按照key值,将相同key的<key,value>对输出到一起,这样shuffle排序就可以满足了,也就不需要reduce函数,只单独指定map函数就OK啦!

应用二:基于分区的MapFile查找技术。(我没仔细看)


2:全排序

对于全排序,金子深有体会,借助于hadoop的Terasort,我曾经写了整数和字符串的全排序,其中代码重叠率很高,只注意改改输入格式什么的就OK了。要进行全局排序,首先要理解分区的概念,并且要使用TotalOrderpartition(因为默认的partition是hashpartition,不适用于全局排序)。主要思路就是将数据按照区间进行分割,比如对整数排序,[0,10000]的在partiiton 0中,(10000,20000]在partition 1中。。。这样排序后面的partition中的数据肯定比排在前面的partition中的数要大,宏观上看是有序的,然后在对每个分区中的数据进行排序,由于这时分区中数据量已经比较小了,在进行排序就容易的多了。在数据分布均匀的情况下,每个分区内的数据量基本相同,这种就是比较理想的情况了,但是实际中数据往往分布不均匀,出现了数据倾斜的情况,这时按照之前的分区划分数据就不合适了,此时就需要一个东西的帮助——采样器。采样的核心思想是只查看一小部分键,获得键的近似分布,并由此键分区。关于采样器的一些使用细节,可以查看我的另一篇博客:Hadoop 中的采样器-不一样的视角 Hadoop中的采样器——不一样的视角

典型应用:TeraSort


3:二次排序

也称作辅助排序,MapReduce框架在把记录到达reducers之前会将记录按照键排序。对于任意一个特殊的键,然而,值是不排序的。甚至是,值在两次执行中的顺序是不一样的,原因是它们是从不同的map中来的,这些不同的map可能在不同的执行过程中结束的先后顺序不确定。通常情况下,大多数的MapReduce程序的reduce函数不会依赖于值的顺序。然而,我们也可通过以一种特殊的方式排序和分组键,来指定值的顺序。

二次排序金子并没有使用过,不过在join连接操作中,输入到一个reduce中的value<list>是来自两个表的,如果进行排序,将第一个表的放在前面,第二个表的放在后面,这样就只需要将表1存放到ArrayList中,表二不需要,然后进行全连接就搞定啦,这样是很多关于并行数据库的论文中对join操作的一个明显优化。

看到一篇写的很好的分析二次排序的博客:http://blog.sina.com.cn/s/blog_70324d8d0100wa63.html,讲的是日志分析中二次排序是怎么应用的。要写二次排序,则需要非常熟悉Jobconf的几个函数,以及各自相关的类:

A:setOutputKeyComparatorClass  :参数为继承RawComparator的子类

public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass)
RawComparator接口:继承自Comparator ,就是一个比较器,直接在代表对象特征的字节上进行操作。经常使用的其实现的类有:DoubleWritable.ComparatorFloatWritable.ComparatorIntWritable.Comparator LongWritable.ComparatorNullWritable.ComparatorSecondarySort.FirstGroupingComparatorSecondarySort.IntPair.Comparator,Text.Comparator,WritableComparator

自定义的类要实现compare函数。以上这部分的代码就是对组合键中的key进行排序的意思。

很多的二次排序例子中就利用继承WritableComparator来实现根据组合键进行排序。即将compare中的两个参数转换为组合键,组合键中的自然键不同时按照自然键排序,自然键相同时按照自然值排序。可参见《权威指南》中p243的代码。


B:setPartitionerClass:用于指定用于分区的类,参数我继承Partitioner的类

public void setPartitionerClass(Class<? extends Partitioner> theClass)
Deprecated. 
Set the Partitioner class used to partition Mapper-outputs to be sent to the Reducers.
设置分区的类目的就是用于将map的输出按照指定的规则进行分区,每个分区进入不同的reduce,每个分区中按照自己程序的要求,可以有多个key值,如果一般的分区如Hashpartitioner 和TotalOrderPartitioner不能满足需求时,需要自定义继承Partitioner的类。

二次排序中由于map的输出key为组合键IntPair,所以自定义的分区类要继承Partitioner,同时函数getPartition 的返回值要根据组合键中的自然键,即key.first进行判断,例如return Math.abs(key.getFirst()*127)% numpartitions; 返回值相同的就被分配到一个分区中了。这样一个分区中有多个不同的自然键,但是reduce的输入要满足对于非组合键,就是单纯的一个自然键时,输入是 <key, value<list>>的形式,而现在全部都是<IntPair ,value><IntPair,value>....的形式,我们要将自然键相同的value放到一起,形成一个list,要怎么办呢,就要进行分组啦!!!分组也同样要用comparator,见下面C哦~


C:setOutputValueGroupingComparator:指定用户自定义的comparator,用于将reduce的输入进行分组,二次排序中可以理解为将自然键key相同的放到一起,相同key的value放到一个value迭代器里。

public void setOutputValueGroupingComparator(Class<? extends RawComparator> theClass)


二次排序工作原理综述:(转自:http://p-x1984.iteye.com/blog/800269)

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。Hadoop自带的例子SecondrySort使用TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。 

在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

二次排序应用: 日志分析(转自:)
在计算visits的时候是一个很常见的需求。比如
原始日志为
用户标识为11111的 1点00 访问页面page1
用户标识为11111的 1点01 访问页面page2
用户标识为11111的 1点05 访问页面page3
用户标识为22222的 1点00 访问页面page1
用户标识为22222的 1点05 访问页面page3,


你需要统计结果为
用户11111 1:00的入口是 page1,出口是page3,访问3个页面,停留时间为5分钟
用户22222 1点00的入口是page1,出口是page3,访问2个页面,停留时间为5分钟
3 实步


实现简要步骤为
1. 构造(用户标识,时间)作为key, 时间和其他信息(比如访问页面)作为value,然后进入map流程
2. 在缺省的reduce的,传入参数为 单个key和value的集合,这会导致相同的用户标识和相同的时间被分在同一组,比如用户标识为11111的 1点00一个reduce, 用户标识为11111的 1点01另外一组,这不符合要求.所以需要更改缺省分组,需要由原来的按(用户标识,时间)改成按(用户标识)分组就行了。这样reduce是传入参数变为
户标识为11111 的value集合为(1点00 访问页面page1, 1点01 访问页面page2, 1点05 访问页面page3),然后在reduce方法里写自己的统计逻辑就行了。
3. 当然1和2步之间,有2个重要细节要处理:确定key的排序规则和确定分区规则(分区规则保证map后分配数据到reduce按照用户标识来散列,而不是按缺省的用户标识+时间来散列)

这篇关于hadooper-Hadoop中的各种排序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/903850

相关文章

Python中lambda排序的六种方法

《Python中lambda排序的六种方法》本文主要介绍了Python中使用lambda函数进行排序的六种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录1.对单个变量进行排序2. 对多个变量进行排序3. 降序排列4. 单独降序1.对单个变量进行排序

关于Java内存访问重排序的研究

《关于Java内存访问重排序的研究》文章主要介绍了重排序现象及其在多线程编程中的影响,包括内存可见性问题和Java内存模型中对重排序的规则... 目录什么是重排序重排序图解重排序实验as-if-serial语义内存访问重排序与内存可见性内存访问重排序与Java内存模型重排序示意表内存屏障内存屏障示意表Int

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

【数据结构】——原来排序算法搞懂这些就行,轻松拿捏

前言:快速排序的实现最重要的是找基准值,下面让我们来了解如何实现找基准值 基准值的注释:在快排的过程中,每一次我们要取一个元素作为枢纽值,以这个数字来将序列划分为两部分。 在此我们采用三数取中法,也就是取左端、中间、右端三个数,然后进行排序,将中间数作为枢纽值。 快速排序实现主框架: //快速排序 void QuickSort(int* arr, int left, int rig

usaco 1.3 Mixing Milk (结构体排序 qsort) and hdu 2020(sort)

到了这题学会了结构体排序 于是回去修改了 1.2 milking cows 的算法~ 结构体排序核心: 1.结构体定义 struct Milk{int price;int milks;}milk[5000]; 2.自定义的比较函数,若返回值为正,qsort 函数判定a>b ;为负,a<b;为0,a==b; int milkcmp(const void *va,c

hdu 1285(拓扑排序)

题意: 给各个队间的胜负关系,让排名次,名词相同按从小到大排。 解析: 拓扑排序是应用于有向无回路图(Direct Acyclic Graph,简称DAG)上的一种排序方式,对一个有向无回路图进行拓扑排序后,所有的顶点形成一个序列,对所有边(u,v),满足u 在v 的前面。该序列说明了顶点表示的事件或状态发生的整体顺序。比较经典的是在工程活动上,某些工程完成后,另一些工程才能继续,此时

《数据结构(C语言版)第二版》第八章-排序(8.3-交换排序、8.4-选择排序)

8.3 交换排序 8.3.1 冒泡排序 【算法特点】 (1) 稳定排序。 (2) 可用于链式存储结构。 (3) 移动记录次数较多,算法平均时间性能比直接插入排序差。当初始记录无序,n较大时, 此算法不宜采用。 #include <stdio.h>#include <stdlib.h>#define MAXSIZE 26typedef int KeyType;typedef char In