本文主要是介绍Hadoop 多表 join:map side join 范例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在没有 pig 或者 hive 的环境下,直接在 mapreduce 中自己实现 join 是一件极其蛋疼的事情,MR中的join分为好几种,比如有最常见的 reduce side join,map side join,semi join 等。今天我们要讨论的是第 2 种:map side join,这种 join 在处理多个小表关联大表时非常有用,而 reduce join 在处理多表关联时是比较麻烦的,一次只能处理一张表。
1、原理:
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。但 Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
2、环境:
本实例需要的测试文件及 hdfs 文件存放目录如下:
hadoop fs -ls /test/decli
Found 4 items
-rw-r--r-- 2 root supergroup 152 2013-03-06 02:05 /test/decli/login
drwxr-xr-x - root supergroup 0 2013-03-06 02:45 /test/decli/output
-rw-r--r-- 2 root supergroup 12 2013-03-06 02:12 /test/decli/sex
-rw-r--r-- 2 root supergroup 72 2013-03-06 02:44 /test/decli/user
测试文件内容分别为:
root@master 192.168.120.236 02:58:03 ~/test/table >
cat login # 登录表,需要判断 uid 列是否有效,并得到对应用户名、性别、访问次数
1 0 20121213
2 0 20121213
3 1 20121213
4 1 20121213
1 0 20121114
2 0 20121114
3 1 20121114
4 1 20121114
1 0 20121213
1 0 20121114
9 0 20121114
root@master 192.168.120.236 02:58:08 ~/test/table >
cat sex # 性别表
0 男
1 女
root@master 192.168.120.236 02:58:13 ~/test/table >
cat user # 用户属性表
1 张三 hubei
3 王五 tianjin
4 赵六 guangzhou
2 李四 beijing
root@master 192.168.120.236 02:58:16 ~/test/table >
测试环境 hadoop 版本:
好了,废话少说,上代码:
3、代码:
001 | import java.io.BufferedReader; |
002 | import java.io.FileReader; |
003 | import java.io.IOException; |
004 | import java.util.HashMap; |
007 | import org.apache.hadoop.conf.Configuration; |
008 | import org.apache.hadoop.conf.Configured; |
009 | import org.apache.hadoop.filecache.DistributedCache; |
010 | import org.apache.hadoop.fs.Path; |
011 | import org.apache.hadoop.io.LongWritable; |
012 | import org.apache.hadoop.io.Text; |
013 | import org.apache.hadoop.mapreduce.Job; |
014 | import org.apache.hadoop.mapreduce.Mapper; |
015 | import org.apache.hadoop.mapreduce.Reducer; |
016 | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
017 | import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
018 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
019 | import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
020 | import org.apache.hadoop.util.GenericOptionsParser; |
021 | import org.apache.hadoop.util.Tool; |
022 | import org.apache.hadoop.util.ToolRunner; |
024 | public class MultiTableJoin extends Configured implements Tool { |
025 | public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { |
028 | private Map<String, String> userMap = new HashMap<String, String>(); |
029 | private Map<String, String> sexMap = new HashMap<String, String>(); |
031 | private Text oKey = new Text(); |
032 | private Text oValue = new Text(); |
037 | protected void setup(Context context) throws IOException, |
038 | InterruptedException { |
039 | BufferedReader in = null ; |
043 | Path[] paths = DistributedCache.getLocalCacheFiles(context |
044 | .getConfiguration()); |
045 | String uidNameAddr = null ; |
046 | String sidSex = null ; |
047 | for (Path path : paths) { |
048 | if (path.toString().contains( "user" )) { |
049 | in = new BufferedReader( new FileReader(path.toString())); |
050 | while ( null != (uidNameAddr = in.readLine())) { |
051 | userMap.put(uidNameAddr.split( "\t" , - 1 )[ 0 ], |
052 | uidNameAddr.split( "\t" , - 1 )[ 1 ]); |
054 | } else if (path.toString().contains( "sex" )) { |
055 | in = new BufferedReader( new FileReader(path.toString())); |
056 | while ( null != (sidSex = in.readLine())) { |
057 | sexMap.put(sidSex.split( "\t" , - 1 )[ 0 ], sidSex.split( |
062 | } catch (IOException e) { |
069 | } catch (IOException e) { |
075 | public void map(LongWritable key, Text value, Context context) |
076 | throws IOException, InterruptedException { |
078 | kv = value.toString().split( "\t" ); |
080 | if (userMap.containsKey(kv[ 0 ]) && sexMap.containsKey(kv[ 1 ])) { |
081 | oKey.set(userMap.get(kv[ 0 ]) + "\t" + sexMap.get(kv[ 1 ])); |
083 | context.write(oKey, oValue); |
089 | public static class Reduce extends Reducer<Text, Text, Text, Text> { |
094 | public void reduce(Text key, Iterable<Text> values, Context context) |
095 | throws IOException, InterruptedException { |
098 | for (Text val : values) { |
099 | sumCount += Integer.parseInt(val.toString()); |
102 | context.write(key, new Text(String.valueOf(sumCount))); |
107 | public int run(String[] args) throws Exception { |
108 | Job job = new Job(getConf(), "MultiTableJoin" ); |
110 | job.setJobName( "MultiTableJoin" ); |
111 | job.setJarByClass(MultiTableJoin. class ); |
112 | job.setMapperClass(MapClass. class ); |
113 | job.setReducerClass(Reduce. class ); |
115 | job.setInputFormatClass(TextInputFormat. class ); |
116 | job.setOutputFormatClass(TextOutputFormat. class ); |
118 | job.setOutputKeyClass(Text. class ); |
119 | job.setOutputValueClass(Text. class ); |
121 | String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), |
122 | args).getRemainingArgs(); |
125 | DistributedCache.addCacheFile( new Path(otherArgs[ 1 ]).toUri(), job |
126 | .getConfiguration()); |
127 | DistributedCache.addCacheFile( new Path(otherArgs[ 2 ]).toUri(), job |
128 | .getConfiguration()); |
130 | FileInputFormat.addInputPath(job, new Path(otherArgs[ 3 ])); |
131 | FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 4 ])); |
133 | return job.waitForCompletion( true ) ? 0 : 1 ; |
136 | public static void main(String[] args) throws Exception { |
137 | int res = ToolRunner.run( new Configuration(), new MultiTableJoin(), |
运行命令:
1 | hadoop jar MultiTableJoin.jar MultiTableJoin /test/decli/sex /test/decli/user /test/decli/login /test/decli/output |
4、结果:
运行结果:
root@master 192.168.120.236 02:47:18 ~/test/table >
hadoop fs -cat /test/decli/output/*|column -t
cat: File does not exist: /test/decli/output/_logs
张三 男 4
李四 男 2
王五 女 2
赵六 女 2
root@master 192.168.120.236 02:47:26 ~/test/table >
TIPS:
更多关于 hadoop mapreduce 相关 join 介绍,请参考之前的博文:
MapReduce 中的两表 join 几种方案简介
http://my.oschina.net/leejun2005/blog/95186
这篇关于Hadoop 多表 join:map side join 范例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!