Mapreduce的排序(全局排序、分区加排序、Combiner优化)

2023-12-14 06:18

本文主要是介绍Mapreduce的排序(全局排序、分区加排序、Combiner优化),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、MR排序的分类

  1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;

  2.全局排序;

  3.辅助排序:再第一次排序后经过分区再排序一次;

  4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。

 

二、MR排序的接口——WritableComparable

  该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。

 

三、流量统计案例的排序与分区

/*** @author: PrincessHug* @date: 2019/3/24, 15:36* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FlowSortBean implements WritableComparable<FlowSortBean> {private long upFlow;private long dwFlow;private long flowSum;public FlowSortBean() {}public FlowSortBean(long upFlow, long dwFlow) {this.upFlow = upFlow;this.dwFlow = dwFlow;this.flowSum = upFlow + dwFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDwFlow() {return dwFlow;}public void setDwFlow(long dwFlow) {this.dwFlow = dwFlow;}public long getFlowSum() {return flowSum;}public void setFlowSum(long flowSum) {this.flowSum = flowSum;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dwFlow);out.writeLong(flowSum);}@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();dwFlow = in.readLong();flowSum = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + dwFlow + "\t" + flowSum;}@Overridepublic int compareTo(FlowSortBean o) {return this.flowSum > o.getFlowSum() ? -1:1;}
}public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取数据String line = value.toString();//切分数据String[] fields = line.split("\t");//封装数据long upFlow = Long.parseLong(fields[1]);long dwFlow = Long.parseLong(fields[2]);//传输数据context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));}
}public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {@Overrideprotected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(),key);}
}public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {@Overridepublic int getPartition(FlowSortBean key, Text value, int i) {String phoneNum = value.toString().substring(0, 3);int partition = 4;if ("135".equals(phoneNum)){return 0;}else if ("137".equals(phoneNum)){return 1;}else if ("138".equals(phoneNum)){return 2;}else if ("139".equals(phoneNum)){return 3;}return partition;}
}public class FlowSortDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//设置配置,初始化Job类Configuration conf = new Configuration();Job job = Job.getInstance(conf);//设置执行类job.setJarByClass(FlowSortDriver.class);//设置Mapper、Reducer类job.setMapperClass(FlowSortMapper.class);job.setReducerClass(FlowSortReducer.class);//设置Mapper输出数据类型job.setMapOutputKeyClass(FlowSortBean.class);job.setMapOutputValueClass(Text.class);//设置Reducer输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowSortBean.class);//设置自定义分区job.setPartitionerClass(FlowSortPartitioner.class);job.setNumReduceTasks(5);//设置文件输入输出类型FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\flowsort\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\flowsort\\partitionout"));//提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

  注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。

 

四、Combiner合并

  Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。

  Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。

  例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:

public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{protected void reduce(Text key,Iterable<IntWritable> values,Context context){//计数int count = 0;//累加求和for(IntWritable v:values){count += v.get();}//输出context.write(key,new IntWritable(count));}
}

  然后再Driver类中设置使用Combiner类

job.setCombinerClass(WordCountCombiner.class);

  如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:

job.setCombinerClass(WordCountReducer.class);

 

  注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:

  mapper输出两个分区:3,5,7  =>avg=5

            2,6    =>avg=4

  reducer合并输出:  5,4     =>avg=4.5  但是实际应该为4.6,错误!

  所以在使用Combiner时要注意其不会影响最中的结果!!!

 

转载于:https://www.cnblogs.com/HelloBigTable/p/10591267.html

这篇关于Mapreduce的排序(全局排序、分区加排序、Combiner优化)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/491430

相关文章

Spring排序机制之接口与注解的使用方法

《Spring排序机制之接口与注解的使用方法》本文介绍了Spring中多种排序机制,包括Ordered接口、PriorityOrdered接口、@Order注解和@Priority注解,提供了详细示例... 目录一、Spring 排序的需求场景二、Spring 中的排序机制1、Ordered 接口2、Pri

Deepseek使用指南与提问优化策略方式

《Deepseek使用指南与提问优化策略方式》本文介绍了DeepSeek语义搜索引擎的核心功能、集成方法及优化提问策略,通过自然语言处理和机器学习提供精准搜索结果,适用于智能客服、知识库检索等领域... 目录序言1. DeepSeek 概述2. DeepSeek 的集成与使用2.1 DeepSeek API

springboot日期格式化全局LocalDateTime详解

《springboot日期格式化全局LocalDateTime详解》文章主要分析了SpringBoot中ObjectMapper对象的序列化和反序列化过程,并具体探讨了日期格式化问题,通过分析Spri... 目录分析ObjectMapper与jsonSerializer结论自定义日期格式(全局)扩展利用配置

Tomcat高效部署与性能优化方式

《Tomcat高效部署与性能优化方式》本文介绍了如何高效部署Tomcat并进行性能优化,以确保Web应用的稳定运行和高效响应,高效部署包括环境准备、安装Tomcat、配置Tomcat、部署应用和启动T... 目录Tomcat高效部署与性能优化一、引言二、Tomcat高效部署三、Tomcat性能优化总结Tom

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

大数据小内存排序问题如何巧妙解决

《大数据小内存排序问题如何巧妙解决》文章介绍了大数据小内存排序的三种方法:数据库排序、分治法和位图法,数据库排序简单但速度慢,对设备要求高;分治法高效但实现复杂;位图法可读性差,但存储空间受限... 目录三种方法:方法概要数据库排序(http://www.chinasem.cn对数据库设备要求较高)分治法(常

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J