本文主要是介绍MR案例:Left Outer Join,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
适用场景:适合两个大表连接操作
用法:Join操作在reduce task中完成 【默认的join方式】,map端按照连接字段进行hash,reduce 端完成连接操作
代码实现:
package join.map;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class JoinOn {public static void main(String[] args) throws Exception {//临时配置windows的环境变量System.setProperty("hadoop.home.dir", "D:\\workspace\\hadoop-2.2.0");Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(JoinOn.class);job.setMapperClass(JOMapper.class);job.setReducerClass(JOReducer.class);job.setMapOutputKeyClass(VLongWritable.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true)? 0:1);}public static class JOMapper extends Mapper<LongWritable, Text, VLongWritable, Text>{@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//获取当前分片所对应的文件名String name = ((FileSplit)context.getInputSplit()).getPath().getName();String[] splited = value.toString().split("\t");if(name.endsWith("sales")){//sales表//<key,value> --> <id, things+':'+name+'\t'+id>context.write(new VLongWritable(Long.parseLong(splited[1])), new Text(name+":"+value.toString()));}else if(name.endsWith("things")) {//<key,value> --> <id, sales+':'+id+'\t'+name>context.write(new VLongWritable(Long.parseLong(splited[0])), new Text(name+":"+value.toString()));} }}public static class JOReducer extends Reducer<VLongWritable, Text, Text, Text>{@Overrideprotected void reduce(VLongWritable key, Iterable<Text> v2s, Context context)throws IOException, InterruptedException {//分别存储sales和things两表的nameList<String> sales=new ArrayList<String>();List<String> things=new ArrayList<String>();for(Text text : v2s){String[] splited = text.toString().split(":");//sales表中的数据if(splited[0].endsWith("sales")){//加入集合sales.add(splited[1]);}//things表中数据else if(splited[0].endsWith("things")){things.add(splited[1]);}}//笛卡尔积/*** 左外连接:只要求左表中有数据即可*/if(sales.size()!=0 /*&& things.size()!=0*/){for(String sale : sales){//如果右表中没有数据,则使用 NULL 代替if(things.size()==0){context.write(new Text(sale), new Text("NULL"+"\t"+"NILL"));}else {//如果右表中有数据,则直接输出for(String thing : things){context.write(new Text(sale), new Text(thing));}}} }}}
}
MR过程分解
input
//sales.txt
Joe 2
Hank 4
Ali 0
Eve 3
Hank 2
//things.txt
2 Tie
4 Coat
3 Hat
1 Scarf
map
key -> value
2 -> sales:Joe 2
4 -> sales:Hank 4
0 -> sales:Ali 0
3 -> sales:Eve 3
2 -> sales:Hank 2key -> value
2 -> things:2 Tie
4 -> things:4 Coat
3 -> things:3 Hat
1 -> things:1 Scarf
shuffle
2 [sales:Joe 2;sales:Hank 2;things:2 Tie]
4 [sales:Hank 4;things:4 Coat]
0 [sales:Ali 0;]
3 [sales:Eve 3;things:3 Hat]
reduce
2 salesList: Joe 2;Hank 2; ----> Joe 2 2 Tie thingsList: 2 Tie; Hank 2 2 Tie
4 salesList: Hank 4; ----> Hank 4 4 Coat thingsList: 4 Coat;
0 salesList: Ali 0; ----> Ali 0 NULL NULL
3 salesList: Eve 3; ----> Eve 3 3 Hat thingsList: 3 Hat;
output
//sales.txt join things.txt
Joe 2 2 Tie
Hank 2 2 Tie
Hank 4 4 Coat
Ali 0 NULL NULL
Eve 3 3 Hat
参考文章:http://www.cnblogs.com/skyl/p/4737347.html
这篇关于MR案例:Left Outer Join的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!