本文主要是介绍MapReduce 自定义partitioner,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
需求:将以下数据进行分开处理,其中第六个字段表示开奖结果数值,现在以15为分界点,将15以上的结果保存到一个文件,15以下的结果保存到一个文件。
(以图片数据为例)
定义mapper类:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MapDemo01 extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get());}
}
定义reducer类:
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ReduceDemo01 extends Reducer<Text, NullWritable,Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}
}
自定义partitioner类:
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class Partition extends Partitioner<Text, NullWritable> {@Overridepublic int getPartition(Text text, NullWritable nullWritable, int i) {String result = text.toString().split("\\t")[5];if (Integer.parseInt(result) > 15){return 1;}else{return 0;}}
程序main函数入口:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class DriverDemo01 {public static void main(String[] args) throws Exception {Job job = Job.getInstance(new Configuration(), "Partitioner");job.setJarByClass(DriverDemo01.class);job.setMapperClass(MapDemo01.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);job.setReducerClass(ReduceDemo01.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setPartitionerClass(Partition.class);job.setNumReduceTasks(5);FileInputFormat.addInputPath(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));boolean b = job.waitForCompletion(true);System.exit(b?0:1);}
}
代码编写完毕后将代码打成jar包放到服务器上面去运行
hadoop jar jar包名 main方法路径 读取路径 输出路径
这篇关于MapReduce 自定义partitioner的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!