Mapreduce的排序(全局排序、分区加排序、Combiner优化)

2023-12-14 06:18

本文主要是介绍Mapreduce的排序(全局排序、分区加排序、Combiner优化),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、MR排序的分类

  1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;

  2.全局排序;

  3.辅助排序:再第一次排序后经过分区再排序一次;

  4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。

 

二、MR排序的接口——WritableComparable

  该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。

 

三、流量统计案例的排序与分区

/*** @author: PrincessHug* @date: 2019/3/24, 15:36* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FlowSortBean implements WritableComparable<FlowSortBean> {private long upFlow;private long dwFlow;private long flowSum;public FlowSortBean() {}public FlowSortBean(long upFlow, long dwFlow) {this.upFlow = upFlow;this.dwFlow = dwFlow;this.flowSum = upFlow + dwFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDwFlow() {return dwFlow;}public void setDwFlow(long dwFlow) {this.dwFlow = dwFlow;}public long getFlowSum() {return flowSum;}public void setFlowSum(long flowSum) {this.flowSum = flowSum;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dwFlow);out.writeLong(flowSum);}@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();dwFlow = in.readLong();flowSum = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + dwFlow + "\t" + flowSum;}@Overridepublic int compareTo(FlowSortBean o) {return this.flowSum > o.getFlowSum() ? -1:1;}
}public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取数据String line = value.toString();//切分数据String[] fields = line.split("\t");//封装数据long upFlow = Long.parseLong(fields[1]);long dwFlow = Long.parseLong(fields[2]);//传输数据context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));}
}public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {@Overrideprotected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(),key);}
}public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {@Overridepublic int getPartition(FlowSortBean key, Text value, int i) {String phoneNum = value.toString().substring(0, 3);int partition = 4;if ("135".equals(phoneNum)){return 0;}else if ("137".equals(phoneNum)){return 1;}else if ("138".equals(phoneNum)){return 2;}else if ("139".equals(phoneNum)){return 3;}return partition;}
}public class FlowSortDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//设置配置,初始化Job类Configuration conf = new Configuration();Job job = Job.getInstance(conf);//设置执行类job.setJarByClass(FlowSortDriver.class);//设置Mapper、Reducer类job.setMapperClass(FlowSortMapper.class);job.setReducerClass(FlowSortReducer.class);//设置Mapper输出数据类型job.setMapOutputKeyClass(FlowSortBean.class);job.setMapOutputValueClass(Text.class);//设置Reducer输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowSortBean.class);//设置自定义分区job.setPartitionerClass(FlowSortPartitioner.class);job.setNumReduceTasks(5);//设置文件输入输出类型FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout"));//提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

  注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。

 

四、Combiner合并

  Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。

  Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。

  例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:

public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{protected void reduce(Text key,Iterable<IntWritable> values,Context context){//计数int count = 0;//累加求和for(IntWritable v:values){count += v.get();}//输出context.write(key,new IntWritable(count));}
}

  然后再Driver类中设置使用Combiner类

job.setCombinerClass(WordCountCombiner.class);

  如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:

job.setCombinerClass(WordCountReducer.class);

 

  注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:

  mapper输出两个分区:3,5,7  =>avg=5

            2,6    =>avg=4

  reducer合并输出:  5,4     =>avg=4.5  但是实际应该为4.6,错误!

  所以在使用Combiner时要注意其不会影响最中的结果!!!

 

转载于:https://www.cnblogs.com/HelloBigTable/p/10591267.html

这篇关于Mapreduce的排序(全局排序、分区加排序、Combiner优化)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/491430

相关文章

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam

MySQL数据库实现批量表分区完整示例

《MySQL数据库实现批量表分区完整示例》通俗地讲表分区是将一大表,根据条件分割成若干个小表,:本文主要介绍MySQL数据库实现批量表分区的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考... 目录一、表分区条件二、常规表和分区表的区别三、表分区的创建四、将既有表转换分区表脚本五、批量转换表为分区

Java List排序实例代码详解

《JavaList排序实例代码详解》:本文主要介绍JavaList排序的相关资料,Java排序方法包括自然排序、自定义排序、Lambda简化及多条件排序,实现灵活且代码简洁,文中通过代码介绍的... 目录一、自然排序二、自定义排序规则三、使用 Lambda 表达式简化 Comparator四、多条件排序五、

JAVA数组中五种常见排序方法整理汇总

《JAVA数组中五种常见排序方法整理汇总》本文给大家分享五种常用的Java数组排序方法整理,每种方法结合示例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录前言:法一:Arrays.sort()法二:冒泡排序法三:选择排序法四:反转排序法五:直接插入排序前言:几种常用的Java数组排序

SpringBoot中HTTP连接池的配置与优化

《SpringBoot中HTTP连接池的配置与优化》这篇文章主要为大家详细介绍了SpringBoot中HTTP连接池的配置与优化的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、HTTP连接池的核心价值二、Spring Boot集成方案方案1:Apache HttpCl

PyTorch高级特性与性能优化方式

《PyTorch高级特性与性能优化方式》:本文主要介绍PyTorch高级特性与性能优化方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、自动化机制1.自动微分机制2.动态计算图二、性能优化1.内存管理2.GPU加速3.多GPU训练三、分布式训练1.分布式数据