2.2.8 hadoop体系之离线计算-mapreduce分布式计算-流量统计之上行流量倒序排序(递减排序)

本文主要是介绍2.2.8 hadoop体系之离线计算-mapreduce分布式计算-流量统计之上行流量倒序排序(递减排序),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.需求分析

2.代码实现

2.1 定义FlowBean:实现WritableComparable实现比较排序

2.2 定义FlowCountSortMapper

2.3 定义FlowCountSortReducer

2.4 定义JobMain:程序main函数

3.运行及结果分析


1.需求分析

按数据某个字段排序输出:可以写两个MapReduce

需求一的输出数据作为排序的输入数据,自定义FlowBean,以FlowBean为map输出的key,以手机号作为Map输出的value,因为MapReduce程序会对Map阶段输出的key进行排序

2.代码实现

2.1 定义FlowBean:实现WritableComparable实现比较排序

Java 的 compareTo 方法说明:

  • compareTo 方法用于将当前对象与方法的参数进行比较
  • 如果指定的数与参数相等返回 0
  • 如果指定的数小于参数返回 -1
  • 如果指定的数大于参数返回 1

例如: o1.compareTo(o2); 返回正数的话,当前对象(调用 compareTo 方法的对象 o1) 要排在比较对象(compareTo 传参对象 o2)后面,返回负数的话,放在前面

package ucas.mapreduce_sort;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements WritableComparable<FlowBean> {private Integer upFlow;private Integer downFlow;private Integer upCountFlow;private Integer downCountFlow;public Integer getUpFlow() {return upFlow;}public void setUpFlow(Integer upFlow) {this.upFlow = upFlow;}public Integer getDownFlow() {return downFlow;}public void setDownFlow(Integer downFlow) {this.downFlow = downFlow;}public Integer getUpCountFlow() {return upCountFlow;}public void setUpCountFlow(Integer upCountFlow) {this.upCountFlow = upCountFlow;}public Integer getDownCountFlow() {return downCountFlow;}public void setDownCountFlow(Integer downCountFlow) {this.downCountFlow = downCountFlow;}@Overridepublic String toString() {returnupFlow +"\t" + downFlow +"\t" + upCountFlow +"\t" + downCountFlow;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(upFlow);dataOutput.writeInt(downFlow);dataOutput.writeInt(upCountFlow);dataOutput.writeInt(downCountFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readInt();this.downFlow = dataInput.readInt();this.upCountFlow = dataInput.readInt();this.downCountFlow = dataInput.readInt();}@Overridepublic int compareTo(FlowBean other) {return this.getUpFlow().compareTo(other.getUpFlow()) * -1;  //按照上行数据包进行排序}
}

2.2 定义FlowCountSortMapper

package ucas.mapreduce_sort;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {FlowBean flowBean = new FlowBean();String[] split = value.toString().split("\t");//获取手机号,作为V2String phoneNum = split[0];//获取其他流量字段,封装flowBean,作为K2flowBean.setUpFlow(Integer.parseInt(split[1]));flowBean.setDownFlow(Integer.parseInt(split[2]));flowBean.setUpCountFlow(Integer.parseInt(split[3]));flowBean.setDownCountFlow(Integer.parseInt(split[4]));//将K2和V2写入上下文中context.write(flowBean, new Text(phoneNum));}
}

2.3 定义FlowCountSortReducer

package ucas.mapreduce_sort;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value, key);}}
}

2.4 定义JobMain:程序main函数

package ucas.mapreduce_sort;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class JobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {//创建一个任务对象Job job = Job.getInstance(super.getConf(), "mapreduce_flowcountsort");//打包放在集群运行时,需要做一个配置job.setJarByClass(JobMain.class);//第一步:设置读取文件的类: K1 和V1job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://192.168.0.101:8020/out/flowcount_out"));//第二步:设置Mapper类job.setMapperClass(FlowCountSortMapper.class);//设置Map阶段的输出类型: k2 和V2的类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//第三,四,五,六步采用默认方式(分区,排序,规约,分组)//第七步 :设置文的Reducer类job.setReducerClass(FlowCountSortReducer.class);//设置Reduce阶段的输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置Reduce的个数//第八步:设置输出类job.setOutputFormatClass(TextOutputFormat.class);//设置输出的路径TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.101:8020/out/flowcountsort_out"));boolean b = job.waitForCompletion(true);return b ? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//启动一个任务int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}}

3.运行及结果分析

按上行数据包降序排序:

这篇关于2.2.8 hadoop体系之离线计算-mapreduce分布式计算-流量统计之上行流量倒序排序(递减排序)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754518

相关文章

Python如何计算两个不同类型列表的相似度

《Python如何计算两个不同类型列表的相似度》在编程中,经常需要比较两个列表的相似度,尤其是当这两个列表包含不同类型的元素时,下面小编就来讲讲如何使用Python计算两个不同类型列表的相似度吧... 目录摘要引言数字类型相似度欧几里得距离曼哈顿距离字符串类型相似度Levenshtein距离Jaccard相

Spring排序机制之接口与注解的使用方法

《Spring排序机制之接口与注解的使用方法》本文介绍了Spring中多种排序机制,包括Ordered接口、PriorityOrdered接口、@Order注解和@Priority注解,提供了详细示例... 目录一、Spring 排序的需求场景二、Spring 中的排序机制1、Ordered 接口2、Pri

大数据小内存排序问题如何巧妙解决

《大数据小内存排序问题如何巧妙解决》文章介绍了大数据小内存排序的三种方法:数据库排序、分治法和位图法,数据库排序简单但速度慢,对设备要求高;分治法高效但实现复杂;位图法可读性差,但存储空间受限... 目录三种方法:方法概要数据库排序(http://www.chinasem.cn对数据库设备要求较高)分治法(常

使用C#代码计算数学表达式实例

《使用C#代码计算数学表达式实例》这段文字主要讲述了如何使用C#语言来计算数学表达式,该程序通过使用Dictionary保存变量,定义了运算符优先级,并实现了EvaluateExpression方法来... 目录C#代码计算数学表达式该方法很长,因此我将分段描述下面的代码片段显示了下一步以下代码显示该方法如

使用zabbix进行监控网络设备流量

《使用zabbix进行监控网络设备流量》这篇文章主要为大家详细介绍了如何使用zabbix进行监控网络设备流量,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录安装zabbix配置ENSP环境配置zabbix实行监控交换机测试一台liunx服务器,这里使用的为Ubuntu22.04(

Python中lambda排序的六种方法

《Python中lambda排序的六种方法》本文主要介绍了Python中使用lambda函数进行排序的六种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录1.对单个变量进行排序2. 对多个变量进行排序3. 降序排列4. 单独降序1.对单个变量进行排序

关于Java内存访问重排序的研究

《关于Java内存访问重排序的研究》文章主要介绍了重排序现象及其在多线程编程中的影响,包括内存可见性问题和Java内存模型中对重排序的规则... 目录什么是重排序重排序图解重排序实验as-if-serial语义内存访问重排序与内存可见性内存访问重排序与Java内存模型重排序示意表内存屏障内存屏障示意表Int

opencv实现像素统计的示例代码

《opencv实现像素统计的示例代码》本文介绍了OpenCV中统计图像像素信息的常用方法和函数,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 统计像素值的基本信息2. 统计像素值的直方图3. 统计像素值的总和4. 统计非零像素的数量

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

如何使用 Bash 脚本中的time命令来统计命令执行时间(中英双语)

《如何使用Bash脚本中的time命令来统计命令执行时间(中英双语)》本文介绍了如何在Bash脚本中使用`time`命令来测量命令执行时间,包括`real`、`user`和`sys`三个时间指标,... 使用 Bash 脚本中的 time 命令来统计命令执行时间在日常的开发和运维过程中,性能监控和优化是不