本文主要是介绍区内排序,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
示例:将统计结果按照手机号,以136、137、138、139开头的数据分别放到一个独立的文件中,其他开头的放到一个文件中,最终按照总流量内部排序。
待排数据
1863157985066 120.196.100.82 2481 24681 200
1363157995033 120.197.40.4 264 0 200
1373157993055 120.196.100.99 132 1512 200
1393154400022 120.197.40.4 240 0 200
1363157993044 120.196.100.99 1527 2106 200
1397157993055 120.197.40.4 4116 1432 200
1463157993055 120.196.100.99 1116 954 200
1383157995033 120.197.40.4 3156 2936 200
1363157983019 120.196.100.82 240 0 200
1383154400022 120.197.40.4 6960 690 200
1363157973098 120.197.40.4 3659 3538 200
1373157993055 120.196.100.99 1938 180 200
1363154400022 120.196.100.99 918 4938 200
1393157993055 120.197.40.4 180 180 200
1363157984040 120.197.40.4 1938 2910 200
1383157995033 120.196.100.82 3008 3720 200
1363154400022 120.196.100.99 7335 110349 200
1373157993055 120.196.100.99 9531 2412 200
1363157990043 120.196.100.55 11058 48243 200
1383157993055 120.196.100.82 120 120 200
1323157985066 120.196.100.82 2481 24681 200
1393157993055 120.196.100.99 1116 954 200
期待结果
- part-4-00000
- part-4-00001
- part-4-00002
- part-4-00003
- part-4-00004
具体实现
第一步:自定义Bean:
public class FlowBean implements WritableComparable<FlowBean> {private String phoneNumber;//电话号码private long upFlow;//上行流量private long downFlow;//下行流量private long sumFlow;//总流量public String getPhoneNumber() {return phoneNumber;}public void setPhoneNumber(String phoneNumber) {this.phoneNumber = phoneNumber;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public FlowBean(String phoneNumber, long upFlow, long downFlow) { //为了对象数据的初始化方便,加入一个带参的构造函数this.phoneNumber = phoneNumber;this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public FlowBean() { //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数}//重写toString()方法@Overridepublic String toString() {return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + "";}@Overridepublic void readFields(DataInput in) throws IOException { //从数据流中反序列出对象的数据phoneNumber = in.readUTF();upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic void write(DataOutput out) throws IOException { //将对象数据序列化到流中out.writeUTF(phoneNumber);out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic int compareTo(FlowBean o) {return (int) (o.getSumFlow() - this.getSumFlow());}
}
第二步:自定义Mapper
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString(); //拿到一行数据String[] fields = line.split("\\s+"); //切分成各个字段String phoneNumber = fields[0]; //拿到手机号的字段long upFlow = Long.parseLong(fields[2]); //拿到上行流量字段long downFlow = Long.parseLong(fields[3]); //拿到下行流量字段//封装数据为key-value进行输出context.write(new FlowBean(phoneNumber, upFlow, downFlow),new Text(phoneNumber));}
}
第三步:自定义Partitioner
public class FlowPartitioner extends Partitioner<FlowBean, Text> {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) {String preNum = text.toString().substring(0, 3);//获取手机号码前三位int partition = 4;switch (preNum) { //根据手机号前置设置分区case "136":partition = 0;//必须从0开始break;case "137":partition = 1;break;case "138":partition = 2;break;case "139":partition = 3;break;default:break;}return partition;}
}
第四步:自定义Reducer
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value, key);}}
}
第五步:自定义Driver
public class FlowDriver {public static void main(String[] args) throws Exception {// 数据输入路径和输出路径args = new String[2];args[0] = "src/main/resources/sort/feni";args[1] = "src/main/resources/sort/feno";Configuration cfg = new Configuration();// 读取配置文件//设置本地模式运行(即使项目类路径下core-site.xml文件,依然采用本地模式)cfg.set("mapreduce.framework.name", "local");cfg.set("fs.defaultFS", "file:///");Job job = Job.getInstance(cfg);// 新建一个任务job.setJarByClass(FlowDriver.class); // 设置主类job.setInputFormatClass(TextInputFormat.class);//设置输入格式job.setOutputFormatClass(TextOutputFormat.class);//本job使用的mapper和reducerjob.setMapperClass(FlowMapper.class); // Mapperjob.setReducerClass(FlowReducer.class); // Reducer//指定mapper输出数据的key-value类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//指定最终输出数据的key-value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);job.setPartitionerClass(FlowPartitioner.class);//设置自定义分区job.setNumReduceTasks(5);//设置ReduceTask个数FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径// 提交任务int res = job.waitForCompletion(true) ? 0 : 1;System.exit(res);}
}
这篇关于区内排序的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!