Hadoop 多表 join:map side join 范例

2024-03-19 13:58
文章标签 多表 map join hadoop 范例 side

本文主要是介绍Hadoop 多表 join:map side join 范例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在没有 pig 或者 hive 的环境下,直接在 mapreduce 中自己实现 join 是一件极其蛋疼的事情,MR中的join分为好几种,比如有最常见的 reduce side join,map side join,semi join 等。今天我们要讨论的是第 2 种:map side join,这种 join 在处理多个小表关联大表时非常有用,而 reduce join 在处理多表关联时是比较麻烦的,一次只能处理一张表。

1、原理:

      之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。但 Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。

(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

2、环境:

本实例需要的测试文件及 hdfs 文件存放目录如下:

hadoop fs -ls /test/decli
Found 4 items
-rw-r--r--   2 root supergroup        152 2013-03-06 02:05 /test/decli/login
drwxr-xr-x   - root supergroup          0 2013-03-06 02:45 /test/decli/output
-rw-r--r--   2 root supergroup         12 2013-03-06 02:12 /test/decli/sex
-rw-r--r--   2 root supergroup         72 2013-03-06 02:44 /test/decli/user

测试文件内容分别为:

root@master 192.168.120.236 02:58:03 ~/test/table >
cat login  # 登录表,需要判断 uid 列是否有效,并得到对应用户名、性别、访问次数
1       0       20121213 
2       0       20121213 
3       1       20121213 
4       1       20121213 
1       0       20121114 
2       0       20121114 
3       1       20121114 
4       1       20121114 
1       0       20121213 
1       0       20121114
9       0       20121114
root@master 192.168.120.236 02:58:08 ~/test/table >
cat sex # 性别表
0       男
1       女
root@master 192.168.120.236 02:58:13 ~/test/table >
cat user # 用户属性表
1       张三    hubei 
3       王五    tianjin 
4       赵六    guangzhou 
2       李四    beijing 
root@master 192.168.120.236 02:58:16 ~/test/table >

测试环境 hadoop 版本:

view source
print ?
1echo $HADOOP_HOME
2/work/hadoop-0.20.203.0

好了,废话少说,上代码:

3、代码:

view source
print ?
001import java.io.BufferedReader;
002import java.io.FileReader;
003import java.io.IOException;
004import java.util.HashMap;
005import java.util.Map;
006 
007import org.apache.hadoop.conf.Configuration;
008import org.apache.hadoop.conf.Configured;
009import org.apache.hadoop.filecache.DistributedCache;
010import org.apache.hadoop.fs.Path;
011import org.apache.hadoop.io.LongWritable;
012import org.apache.hadoop.io.Text;
013import org.apache.hadoop.mapreduce.Job;
014import org.apache.hadoop.mapreduce.Mapper;
015import org.apache.hadoop.mapreduce.Reducer;
016import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
017import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
018import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
019import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
020import org.apache.hadoop.util.GenericOptionsParser;
021import org.apache.hadoop.util.Tool;
022import org.apache.hadoop.util.ToolRunner;
023 
024public class MultiTableJoin extends Configured implements Tool {
025    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
026 
027        // 用于缓存 sex、user 文件中的数据
028        private Map<String, String> userMap = new HashMap<String, String>();
029        private Map<String, String> sexMap = new HashMap<String, String>();
030 
031        private Text oKey = new Text();
032        private Text oValue = new Text();
033        private String[] kv;
034 
035        // 此方法会在map方法执行之前执行
036        @Override
037        protected void setup(Context context) throws IOException,
038                InterruptedException {
039            BufferedReader in = null;
040 
041            try {
042                // 从当前作业中获取要缓存的文件
043                Path[] paths = DistributedCache.getLocalCacheFiles(context
044                        .getConfiguration());
045                String uidNameAddr = null;
046                String sidSex = null;
047                for (Path path : paths) {
048                    if (path.toString().contains("user")) {
049                        in = new BufferedReader(new FileReader(path.toString()));
050                        while (null != (uidNameAddr = in.readLine())) {
051                            userMap.put(uidNameAddr.split("\t", -1)[0],
052                                    uidNameAddr.split("\t", -1)[1]);
053                        }
054                    else if (path.toString().contains("sex")) {
055                        in = new BufferedReader(new FileReader(path.toString()));
056                        while (null != (sidSex = in.readLine())) {
057                            sexMap.put(sidSex.split("\t", -1)[0], sidSex.split(
058                                    "\t", -1)[1]);
059                        }
060                    }
061                }
062            catch (IOException e) {
063                e.printStackTrace();
064            finally {
065                try {
066                    if (in != null) {
067                        in.close();
068                    }
069                catch (IOException e) {
070                    e.printStackTrace();
071                }
072            }
073        }
074 
075        public void map(LongWritable key, Text value, Context context)
076                throws IOException, InterruptedException {
077 
078            kv = value.toString().split("\t");
079            // map join: 在map阶段过滤掉不需要的数据
080            if (userMap.containsKey(kv[0]) && sexMap.containsKey(kv[1])) {
081                oKey.set(userMap.get(kv[0]) + "\t" + sexMap.get(kv[1]));
082                oValue.set("1");
083                context.write(oKey, oValue);
084            }
085        }
086 
087    }
088 
089    public static class Reduce extends Reducer<Text, Text, Text, Text> {
090 
091        // private Text oValue = new Text();
092        // private StringBuilder sb;
093 
094        public void reduce(Text key, Iterable<Text> values, Context context)
095                throws IOException, InterruptedException {
096            int sumCount = 0;
097 
098            for (Text val : values) {
099                sumCount += Integer.parseInt(val.toString());
100            }
101 
102            context.write(key, new Text(String.valueOf(sumCount)));
103        }
104 
105    }
106 
107    public int run(String[] args) throws Exception {
108        Job job = new Job(getConf(), "MultiTableJoin");
109 
110        job.setJobName("MultiTableJoin");
111        job.setJarByClass(MultiTableJoin.class);
112        job.setMapperClass(MapClass.class);
113        job.setReducerClass(Reduce.class);
114 
115        job.setInputFormatClass(TextInputFormat.class);
116        job.setOutputFormatClass(TextOutputFormat.class);
117 
118        job.setOutputKeyClass(Text.class);
119        job.setOutputValueClass(Text.class);
120 
121        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),
122                args).getRemainingArgs();
123 
124        // 我们把第1、2个参数的地址作为要缓存的文件路径
125        DistributedCache.addCacheFile(new Path(otherArgs[1]).toUri(), job
126                .getConfiguration());
127        DistributedCache.addCacheFile(new Path(otherArgs[2]).toUri(), job
128                .getConfiguration());
129 
130        FileInputFormat.addInputPath(job, new Path(otherArgs[3]));
131        FileOutputFormat.setOutputPath(job, new Path(otherArgs[4]));
132 
133        return job.waitForCompletion(true) ? 0 1;
134    }
135 
136    public static void main(String[] args) throws Exception {
137        int res = ToolRunner.run(new Configuration(), new MultiTableJoin(),
138                args);
139        System.exit(res);
140    }
141 
142}

运行命令:

view source
print ?
1hadoop jar MultiTableJoin.jar MultiTableJoin /test/decli/sex /test/decli/user /test/decli/login /test/decli/output

4、结果:

运行结果:

root@master 192.168.120.236 02:47:18 ~/test/table >
hadoop fs -cat /test/decli/output/*|column -t
cat: File does not exist: /test/decli/output/_logs
张三  男  4
李四  男  2
王五  女  2
赵六  女  2
root@master 192.168.120.236 02:47:26 ~/test/table >

TIPS:

更多关于 hadoop mapreduce 相关 join 介绍,请参考之前的博文:

MapReduce 中的两表 join 几种方案简介

http://my.oschina.net/leejun2005/blog/95186

这篇关于Hadoop 多表 join:map side join 范例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/826213

相关文章

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Collection List Set Map的区别和联系

Collection List Set Map的区别和联系 这些都代表了Java中的集合,这里主要从其元素是否有序,是否可重复来进行区别记忆,以便恰当地使用,当然还存在同步方面的差异,见上一篇相关文章。 有序否 允许元素重复否 Collection 否 是 List 是 是 Set AbstractSet 否

MySQL基础(7)- 多表查询

目录 一、笛卡尔积的错误与正确的多表查询 1.出现笛卡尔积错误 2.正确的多表查询:需要有连接条件 3.查询多个表中都存在的字段 4.SELECT和WHERE中使用表的别名 二、等值连接vs非等值连接、自连接vs非自连接 1.等值连接  vs  非等值连接 2.自连接  vs  非自连接 3.内连接  vs  外连接 4.UNION  和 UNION ALL的使用 5.7种J

MySQL学习笔记-join语句类型

join从句的类型:内链接(inner) 全外连接(full outer) 左外连接(left outer) 右外连接(right outer) 交叉链接(cross) 连接条件:使用ON设定连接条件,也可以用WHERE代替 · ON:设定连接条件 · WHERE:进行结果集记录的过滤 一,内连接inner join:  内连接是返回左表及右表符合连接条件的记录,在MySQL中JO

Map

Map 是 Java 中用于存储键值对的集合接口。以下是对 Map 的详细介绍: 特点 键值对存储:每个元素包含一个键和一个值。 键唯一:键不能重复,但值可以重复。 无序/有序:根据具体实现,键值对的顺序可能无序(如 HashMap)或有序(如 TreeMap、LinkedHashMap)。 主要实现类 HashMap 基于哈希表,无序存储。 允许一个 null 键和多个 null 值。

Java中集合类Set、List和Map的区别

Java中的集合包括三大类,它们是Set、List和Map,它们都处于java.util包中,Set、List和Map都是接口,它们有各自的实现类。Set的实现类主要有HashSet和TreeSet,List的实现类主要有ArrayList,Map的实现类主要有HashMap和TreeMap。那么它们有什么区别呢? Set中的对象不按特定方式排序,并且没有重复对象。但它的有些实现类能对集合中的对

C++数据结构重要知识点(5)(哈希表、unordered_map和unordered_set封装)

1.哈希思想和哈希表 (1)哈希思想和哈希表的区别 哈希(散列、hash)是一种映射思想,本质上是值和值建立映射关系,key-value就使用了这种思想。哈希表(散列表,数据结构),主要功能是值和存储位置建立映射关系,它通过key-value模型中的key来定位数组的下标,将value存进该位置。 哈希思想和哈希表数据结构这两个概念要分清,哈希是哈希表的核心思想。 (2)unordered