Mapreduce的排序(全局排序、分区加排序、Combiner优化)

2023-12-14 06:18

本文主要是介绍Mapreduce的排序(全局排序、分区加排序、Combiner优化),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、MR排序的分类

  1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;

  2.全局排序;

  3.辅助排序:再第一次排序后经过分区再排序一次;

  4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。

 

二、MR排序的接口——WritableComparable

  该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。

 

三、流量统计案例的排序与分区

/*** @author: PrincessHug* @date: 2019/3/24, 15:36* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FlowSortBean implements WritableComparable<FlowSortBean> {private long upFlow;private long dwFlow;private long flowSum;public FlowSortBean() {}public FlowSortBean(long upFlow, long dwFlow) {this.upFlow = upFlow;this.dwFlow = dwFlow;this.flowSum = upFlow + dwFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDwFlow() {return dwFlow;}public void setDwFlow(long dwFlow) {this.dwFlow = dwFlow;}public long getFlowSum() {return flowSum;}public void setFlowSum(long flowSum) {this.flowSum = flowSum;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dwFlow);out.writeLong(flowSum);}@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();dwFlow = in.readLong();flowSum = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + dwFlow + "\t" + flowSum;}@Overridepublic int compareTo(FlowSortBean o) {return this.flowSum > o.getFlowSum() ? -1:1;}
}public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取数据String line = value.toString();//切分数据String[] fields = line.split("\t");//封装数据long upFlow = Long.parseLong(fields[1]);long dwFlow = Long.parseLong(fields[2]);//传输数据context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));}
}public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {@Overrideprotected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(),key);}
}public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {@Overridepublic int getPartition(FlowSortBean key, Text value, int i) {String phoneNum = value.toString().substring(0, 3);int partition = 4;if ("135".equals(phoneNum)){return 0;}else if ("137".equals(phoneNum)){return 1;}else if ("138".equals(phoneNum)){return 2;}else if ("139".equals(phoneNum)){return 3;}return partition;}
}public class FlowSortDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//设置配置,初始化Job类Configuration conf = new Configuration();Job job = Job.getInstance(conf);//设置执行类job.setJarByClass(FlowSortDriver.class);//设置Mapper、Reducer类job.setMapperClass(FlowSortMapper.class);job.setReducerClass(FlowSortReducer.class);//设置Mapper输出数据类型job.setMapOutputKeyClass(FlowSortBean.class);job.setMapOutputValueClass(Text.class);//设置Reducer输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowSortBean.class);//设置自定义分区job.setPartitionerClass(FlowSortPartitioner.class);job.setNumReduceTasks(5);//设置文件输入输出类型FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout"));//提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

  注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。

 

四、Combiner合并

  Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。

  Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。

  例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:

public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{protected void reduce(Text key,Iterable<IntWritable> values,Context context){//计数int count = 0;//累加求和for(IntWritable v:values){count += v.get();}//输出context.write(key,new IntWritable(count));}
}

  然后再Driver类中设置使用Combiner类

job.setCombinerClass(WordCountCombiner.class);

  如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:

job.setCombinerClass(WordCountReducer.class);

 

  注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:

  mapper输出两个分区:3,5,7  =>avg=5

            2,6    =>avg=4

  reducer合并输出:  5,4     =>avg=4.5  但是实际应该为4.6,错误!

  所以在使用Combiner时要注意其不会影响最中的结果!!!

 

转载于:https://www.cnblogs.com/HelloBigTable/p/10591267.html

这篇关于Mapreduce的排序(全局排序、分区加排序、Combiner优化)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/491430

相关文章

MySQL索引的优化之LIKE模糊查询功能实现

《MySQL索引的优化之LIKE模糊查询功能实现》:本文主要介绍MySQL索引的优化之LIKE模糊查询功能实现,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一、前缀匹配优化二、后缀匹配优化三、中间匹配优化四、覆盖索引优化五、减少查询范围六、避免通配符开头七、使用外部搜索引擎八、分

MySQL 分区与分库分表策略应用小结

《MySQL分区与分库分表策略应用小结》在大数据量、复杂查询和高并发的应用场景下,单一数据库往往难以满足性能和扩展性的要求,本文将详细介绍这两种策略的基本概念、实现方法及优缺点,并通过实际案例展示如... 目录mysql 分区与分库分表策略1. 数据库水平拆分的背景2. MySQL 分区策略2.1 分区概念

Mysql表如何按照日期字段的年月分区

《Mysql表如何按照日期字段的年月分区》:本文主要介绍Mysql表如何按照日期字段的年月分区的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、创键表时直接设置分区二、已有表分区1、分区的前置条件2、分区操作三、验证四、注意总结一、创键表时直接设置分区

Python通过模块化开发优化代码的技巧分享

《Python通过模块化开发优化代码的技巧分享》模块化开发就是把代码拆成一个个“零件”,该封装封装,该拆分拆分,下面小编就来和大家简单聊聊python如何用模块化开发进行代码优化吧... 目录什么是模块化开发如何拆分代码改进版:拆分成模块让模块更强大:使用 __init__.py你一定会遇到的问题模www.

Mybatis 传参与排序模糊查询功能实现

《Mybatis传参与排序模糊查询功能实现》:本文主要介绍Mybatis传参与排序模糊查询功能实现,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、#{ }和${ }传参的区别二、排序三、like查询四、数据库连接池五、mysql 开发企业规范一、#{ }和${ }传参的

SpringBoot首笔交易慢问题排查与优化方案

《SpringBoot首笔交易慢问题排查与优化方案》在我们的微服务项目中,遇到这样的问题:应用启动后,第一笔交易响应耗时高达4、5秒,而后续请求均能在毫秒级完成,这不仅触发监控告警,也极大影响了用户体... 目录问题背景排查步骤1. 日志分析2. 性能工具定位优化方案:提前预热各种资源1. Flowable

SpringBoot3实现Gzip压缩优化的技术指南

《SpringBoot3实现Gzip压缩优化的技术指南》随着Web应用的用户量和数据量增加,网络带宽和页面加载速度逐渐成为瓶颈,为了减少数据传输量,提高用户体验,我们可以使用Gzip压缩HTTP响应,... 目录1、简述2、配置2.1 添加依赖2.2 配置 Gzip 压缩3、服务端应用4、前端应用4.1 N

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Python如何使用__slots__实现节省内存和性能优化

《Python如何使用__slots__实现节省内存和性能优化》你有想过,一个小小的__slots__能让你的Python类内存消耗直接减半吗,没错,今天咱们要聊的就是这个让人眼前一亮的技巧,感兴趣的... 目录背景:内存吃得满满的类__slots__:你的内存管理小助手举个大概的例子:看看效果如何?1.