本文主要是介绍2.2.8 hadoop体系之离线计算-mapreduce分布式计算-流量统计之上行流量倒序排序(递减排序),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
1.需求分析
2.代码实现
2.1 定义FlowBean:实现WritableComparable实现比较排序
2.2 定义FlowCountSortMapper
2.3 定义FlowCountSortReducer
2.4 定义JobMain:程序main函数
3.运行及结果分析
1.需求分析
按数据某个字段排序输出:可以写两个MapReduce
以需求一的输出数据作为排序的输入数据,自定义FlowBean,以FlowBean为map输出的key,以手机号作为Map输出的value,因为MapReduce程序会对Map阶段输出的key进行排序
2.代码实现
2.1 定义FlowBean:实现WritableComparable实现比较排序
Java 的 compareTo 方法说明:
- compareTo 方法用于将当前对象与方法的参数进行比较
- 如果指定的数与参数相等返回 0
- 如果指定的数小于参数返回 -1
- 如果指定的数大于参数返回 1
例如: o1.compareTo(o2); 返回正数的话,当前对象(调用 compareTo 方法的对象 o1) 要排在比较对象(compareTo 传参对象 o2)后面,返回负数的话,放在前面
package ucas.mapreduce_sort;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements WritableComparable<FlowBean> {private Integer upFlow;private Integer downFlow;private Integer upCountFlow;private Integer downCountFlow;public Integer getUpFlow() {return upFlow;}public void setUpFlow(Integer upFlow) {this.upFlow = upFlow;}public Integer getDownFlow() {return downFlow;}public void setDownFlow(Integer downFlow) {this.downFlow = downFlow;}public Integer getUpCountFlow() {return upCountFlow;}public void setUpCountFlow(Integer upCountFlow) {this.upCountFlow = upCountFlow;}public Integer getDownCountFlow() {return downCountFlow;}public void setDownCountFlow(Integer downCountFlow) {this.downCountFlow = downCountFlow;}@Overridepublic String toString() {returnupFlow +"\t" + downFlow +"\t" + upCountFlow +"\t" + downCountFlow;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(upFlow);dataOutput.writeInt(downFlow);dataOutput.writeInt(upCountFlow);dataOutput.writeInt(downCountFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readInt();this.downFlow = dataInput.readInt();this.upCountFlow = dataInput.readInt();this.downCountFlow = dataInput.readInt();}@Overridepublic int compareTo(FlowBean other) {return this.getUpFlow().compareTo(other.getUpFlow()) * -1; //按照上行数据包进行排序}
}
2.2 定义FlowCountSortMapper
package ucas.mapreduce_sort;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {FlowBean flowBean = new FlowBean();String[] split = value.toString().split("\t");//获取手机号,作为V2String phoneNum = split[0];//获取其他流量字段,封装flowBean,作为K2flowBean.setUpFlow(Integer.parseInt(split[1]));flowBean.setDownFlow(Integer.parseInt(split[2]));flowBean.setUpCountFlow(Integer.parseInt(split[3]));flowBean.setDownCountFlow(Integer.parseInt(split[4]));//将K2和V2写入上下文中context.write(flowBean, new Text(phoneNum));}
}
2.3 定义FlowCountSortReducer
package ucas.mapreduce_sort;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value, key);}}
}
2.4 定义JobMain:程序main函数
package ucas.mapreduce_sort;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class JobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {//创建一个任务对象Job job = Job.getInstance(super.getConf(), "mapreduce_flowcountsort");//打包放在集群运行时,需要做一个配置job.setJarByClass(JobMain.class);//第一步:设置读取文件的类: K1 和V1job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://192.168.0.101:8020/out/flowcount_out"));//第二步:设置Mapper类job.setMapperClass(FlowCountSortMapper.class);//设置Map阶段的输出类型: k2 和V2的类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//第三,四,五,六步采用默认方式(分区,排序,规约,分组)//第七步 :设置文的Reducer类job.setReducerClass(FlowCountSortReducer.class);//设置Reduce阶段的输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置Reduce的个数//第八步:设置输出类job.setOutputFormatClass(TextOutputFormat.class);//设置输出的路径TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.101:8020/out/flowcountsort_out"));boolean b = job.waitForCompletion(true);return b ? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//启动一个任务int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}}
3.运行及结果分析
按上行数据包降序排序:
这篇关于2.2.8 hadoop体系之离线计算-mapreduce分布式计算-流量统计之上行流量倒序排序(递减排序)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!