Map端的Join map-side join

2024-01-26 22:30
文章标签 map join 端的 side

本文主要是介绍Map端的Join map-side join,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  • map-side join:(最为高效)
  • 核心思想:将小表进行分布式缓存,在map-task阶段读取缓存文件数据存储到内存数据结构中,以供reduce阶段连接查找。
  • 适用场景:有一个或者多个小表(文件)
  • 优点:将小表缓存,可以高效查询;由于在map阶段进行连接,所以将会大大减小map到reduce端的数据传输,从而减少不必要的shuffle耗时,提高整个mr的执行效率
  • 缺点:如果业务全是大表不适合
  • semi-join(半连接):
  • 核心思想:将大表过滤或者清洗后进行缓存,从而转换为map-side端的join。

导入的包 注意导入长包
以及数据 对应三个文件

/*** 作者:Shishuai* 文件名:MapSideJoinDemo* 时间:2019/9/4 19:11*/package com.mapjoin_reducejoin;import ali.mr.day02.MapSideJoin;
import jdk.nashorn.internal.ir.BaseNode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.jboss.netty.util.internal.ConcurrentHashMap;import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;/*** map-side join:(最为高效)* 核心思想:将小表进行分布式缓存,在map-task阶段读取缓存文件数据存储到内存数据结构中,以供reduce阶段连接查找。* 适用场景:有一个或者多个小表(文件)* 优点:将小表缓存,可以高效查询;由于在map阶段进行连接,所以将会大大减小map到reduce端的数据传输,从而减少不必要的shuffle耗时,提高整个mr的执行效率* 缺点:如果业务全是大表不适合** semi-join(半连接):* 核心思想:将大表过滤或者清洗后进行缓存,从而转换为map-side端的join。** login:uid	sexid	logindate1	1	2017-04-17 08:16:202   2	2017-04-15 06:18:203   1	2017-04-16 05:16:244   2	2017-04-14 03:18:205   1	2017-04-13 02:16:256   2	2017-04-13 01:15:207   1	2017-04-12 08:16:348   2	2017-04-11 09:16:209   0	2017-04-10 05:16:50sex:sexMap0	不知道1	男2	女user uname1	小红2   小行3   小通4   小闪5   小镇6   小振7   小秀8   小微9   小懂10	小明11  小刚12  小举13  小黑14  小白15  小鹏16  小习输出:
loginuid	sex		uname	logindate1	男	小红	2017-04-17 08:16:202	女	小行	2017-04-15 06:18:203	男	小通	2017-04-16 05:16:244	女	小闪	2017-04-14 03:18:205	男	小镇	2017-04-13 02:16:256	女	小振	2017-04-13 01:15:207	男	小秀	2017-04-12 08:16:348	女	小微	2017-04-11 09:16:209	不知道	小懂	2017-04-10 05:16:50*** @Author Shishuai* @Email 1198319583@qq.com* @Description //TODO* @Date 19:39 2019/9/4* @Param* @Retrun 这个打包到集群上运行 两个表ur sex 以及login已经上传到hdfs 而且使用的是ha模式 我的端口是默认8020没改 改过的一般是9000**/

主要的一个setup和一个map函数
在setup 读取缓存文件 就是两个小表 ur 和 sex 因为就两列 读出来数据存到map中
在map 一行一行的读取login数据,切割后得到id
然后根据id取出对应map的值

public class MapSideJoinDemo {//自定义的mapper类public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable>{public Text k = new Text();//读取缓存文件,并按照文件名称读取到map或者别的数据结构中//定义一个存储sex缓存的数据结构Map<String, String> sexMap = new ConcurrentHashMap<String, String>();Map<String, String> userMap = new ConcurrentHashMap<String, String>();//读取缓存在hdfs上的两个表文件//找到这两个缓存文件 将他们放入map中 因为就两列 所以@Overrideprotected void setup(Context context) throws IOException, InterruptedException {Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());for(Path p : paths){String fileName = p.getName();BufferedReader bufferedReader = null;if(fileName.endsWith("sex")){bufferedReader = new BufferedReader(new FileReader(new File(p.toString())));while(bufferedReader.ready()){String line = bufferedReader.readLine();String sexs[] = line.split("\t");sexMap.put(sexs[0], sexs[1]);}}else if(fileName.equals("ur")){bufferedReader = new BufferedReader(new FileReader(new File(p.toString())));while(bufferedReader.ready()){String line = bufferedReader.readLine();String users[] = line.split("\t");userMap.put(users[0], users[1]);}}if(bufferedReader != null){bufferedReader.close();}}}//抽象map函数   (map阶段的核心业务逻辑)//然后进行map过程 一行一行读入login表中的信息//uid  sexid   time//1	   1	 2017-04-17 08:16:20  比如读入这个 切割后根据前边两个id去拿两个map对应的值@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split("\t");String uid = words[0];String sex_id = words[1];String uname = "";String sexlab = "";sexlab = sexMap.getOrDefault(sex_id, "");uname = userMap.getOrDefault(uid, "");this.k.set(uid + "\t" + sexlab + "\t" + uname + "\t" + words[2]);context.write(k, NullWritable.get());}}//驱动方法public static void main(String[] args) {try {//1、获取配置对象并进行属性设置Configuration conf = new Configuration();//对conf设置conf.set("fs.defaultFS", "hdfs://qf");conf.set("dfs.nameservices", "qf");conf.set("dfs.ha.namenodes.qf", "nn1, nn2");conf.set("dfs.namenode.rpc-address.qf.nn1", "hadoop01:8020");conf.set("dfs.namenode.rpc-address.qf.nn2", "hadoop02:8020");conf.set("dfs.client.failover.proxy.provider.qf", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");//2、获取jobJob job = Job.getInstance(conf, "mapSideJoin");//3、对job设置运行主类job.setJarByClass(MapSideJoinDemo.class);//4、对job的map端属性设置job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//设置缓存 (缓存文件读取不了)//job.setCacheFiles();job.addCacheFile(new URI("hdfs://qf:8020/sex"));job.addCacheFile(new URI("hdfs://qf:8020/ur"));//6、设置job的输入路径和输出路径FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//7、提交作业int success = job.waitForCompletion(true) ? 0 : 1;//8、退出System.exit(success);} catch (IOException e) {e.printStackTrace();} catch (URISyntaxException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}
}

[root@hadoop01 join]# yarn jar /home/hadoopDemo-1.0-SNAPSHOT.jar com.mapjoin_reducejoin.MapSideJoinDemo /login /out/03

结果文件 没问题
在这里插入图片描述
在这里插入图片描述

这篇关于Map端的Join map-side join的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/648250

相关文章

Collection List Set Map的区别和联系

Collection List Set Map的区别和联系 这些都代表了Java中的集合,这里主要从其元素是否有序,是否可重复来进行区别记忆,以便恰当地使用,当然还存在同步方面的差异,见上一篇相关文章。 有序否 允许元素重复否 Collection 否 是 List 是 是 Set AbstractSet 否

MySQL学习笔记-join语句类型

join从句的类型:内链接(inner) 全外连接(full outer) 左外连接(left outer) 右外连接(right outer) 交叉链接(cross) 连接条件:使用ON设定连接条件,也可以用WHERE代替 · ON:设定连接条件 · WHERE:进行结果集记录的过滤 一,内连接inner join:  内连接是返回左表及右表符合连接条件的记录,在MySQL中JO

Map

Map 是 Java 中用于存储键值对的集合接口。以下是对 Map 的详细介绍: 特点 键值对存储:每个元素包含一个键和一个值。 键唯一:键不能重复,但值可以重复。 无序/有序:根据具体实现,键值对的顺序可能无序(如 HashMap)或有序(如 TreeMap、LinkedHashMap)。 主要实现类 HashMap 基于哈希表,无序存储。 允许一个 null 键和多个 null 值。

Java中集合类Set、List和Map的区别

Java中的集合包括三大类,它们是Set、List和Map,它们都处于java.util包中,Set、List和Map都是接口,它们有各自的实现类。Set的实现类主要有HashSet和TreeSet,List的实现类主要有ArrayList,Map的实现类主要有HashMap和TreeMap。那么它们有什么区别呢? Set中的对象不按特定方式排序,并且没有重复对象。但它的有些实现类能对集合中的对

C++数据结构重要知识点(5)(哈希表、unordered_map和unordered_set封装)

1.哈希思想和哈希表 (1)哈希思想和哈希表的区别 哈希(散列、hash)是一种映射思想,本质上是值和值建立映射关系,key-value就使用了这种思想。哈希表(散列表,数据结构),主要功能是值和存储位置建立映射关系,它通过key-value模型中的key来定位数组的下标,将value存进该位置。 哈希思想和哈希表数据结构这两个概念要分清,哈希是哈希表的核心思想。 (2)unordered

【C++STL(十四)】一个哈希桶简单模拟实现unordered_map/set

目录 前言 一、改造哈希桶 改造结点类 改造主体  模板参数改造  迭代器(重点) 改造完整哈希桶 unordered_map模拟实现 unordered_set模拟实现 前言 前面的文章都有说unordered_map/set的底层结构就是哈希表,严格来说是哈希桶,那么接下来我们就尝试使用同一个哈希桶来模拟实现一下。整体的逻辑和一棵红黑树封装map/set类似,所以

Java中Map取值转String Null值处理

Map<String, Object> 直接取值转String String value = (String)map.get("key") 当map.get(“key”)为Null值时会报错。 使用String类的valueOf静态方法可以解决这个问题 String value = String.valueOf(map.get("key"))

Creating OpenAI Gym Environment from Map Data

题意:从地图数据创建 OpenAI Gym 环境 问题背景: I am just starting out with reinforcement learning and trying to create a custom environment with OpenAI gym. However, I am stumped with trying to create an enviro

多线程 | join方法

文章目录 1. 作用2. 用法3. 异常4. 源码为什么使用wait方法 5. 如何实现按照指定顺序执行线程6. 线程运行状态 1. 作用 在 Java 多线程中,join方法用于等待一个线程执行完毕。当一个线程调用另一个线程的join方法时,当前线程会进入等待状态,直到被调用的线程执行完毕。这使得开发者可以控制线程的执行顺序,确保某些关键线程在其他线程之前完成执行。 2. 用

SylixOS pthread_join退出

1 问题描述 在移植中间件过程中,在SylixOS下调用pthread_join时,如果线程在pthread_join等待之前结束,则线程返回无效线程错误值。在Linux下这种调用会正常返回。两种实现是有差别的,实现的原理分别如下。 2 函数实现机制 2.1 实现机制 在SylixOS下调用pthread_join时,如果线程在pthread_join等待之前结束,线程返回无效线程错误标志