Mapjoin和Reducejoin案例

2023-12-14 06:18
文章标签 案例 mapjoin reducejoin

本文主要是介绍Mapjoin和Reducejoin案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、Mapjoin案例

  1.需求:有两个文件,分别是订单表、商品表,

  订单表有三个属性分别为订单时间、商品id、订单id(表示内容量大的表),

  商品表有两个属性分别为商品id、商品名称(表示内容量小的表,用于加载到内存),

  要求结果文件为在订单表中的每一行最后添加商品id对应的商品名称。

  2.解决思路:

  将商品表加载到内存中,然后再map方法中将订单表中的商品id对应的商品名称添加到该行的最后,不需要Reducer,并在Driver执行类中设置setCacheFile和numReduceTask。

  3.代码如下:

public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{HashMap<String, String> pdMap = new HashMap<>();//1.商品表加载到内存protected void setup(Context context) throws IOException {//加载缓存文件BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "Utf-8"));String line;while(StringUtils.isNotEmpty(line = br.readLine()) ) {//切分String[] fields = line.split("\t");//缓存pdMap.put(fields[0], fields[1]);}br.close();}//2.map传输@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {//获取数据String line = value.toString();//切割String[] fields = line.split("\t");//获取订单中商品idString pid = fields[1];//根据订单商品id获取商品名String pName = pdMap.get(pid);//拼接数据line = line + "\t" + pName;//输出context.write(new Text(line), NullWritable.get());}
}public class CacheDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {// 1.获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2.获取jar包job.setJarByClass(CacheDriver.class);// 3.获取自定义的mapper与reducer类job.setMapperClass(CacheMapper.class);// 5.设置reduce输出的数据类型(最终的数据类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 6.设置输入存在的路径与处理后的结果路径FileInputFormat.setInputPaths(job, new Path("c://table1029//in"));FileOutputFormat.setOutputPath(job, new Path("c://table1029//out"));//加载缓存商品数据job.addCacheFile(new URI("file:///c:/inputcache/pd.txt"));//设置一下reducetask的数量job.setNumReduceTasks(0);// 7.提交任务boolean rs = job.waitForCompletion(true);System.out.println(rs ? 0 : 1);}
}

  

二、Reducejoin案例

  1.需求:同上的两个数据文件,要求将订单表中的商品id替换成对应的商品名称。

  2.解决思路:封装TableBean类,包含属性:时间、商品id、订单id、商品名称、flag(flag用来判断是哪张表),

    使用Mapper读两张表,通过context对象获取切片对象,然后通过切片获取切片名称和路径的字符串来判断是哪张表,再将切片的数据封装到TableBean对象,最后以产品id为key、TableBean对象为value传输到Reducer端;

    Reducer接收数据后通过flag判断是哪张表,因为一个reduce中的所有数据的key是相同的,将商品表的商品id和商品名称读入到一个TableBean对象中,然后将订单表的中的数据读入到TableBean类型的ArrayList对象中,然后将ArrayList中的每个TableBean的商品id替换为商品名称,然后遍历该数组以TableBean为key输出。

  3.代码如下:

/*** @author: PrincessHug* @date: 2019/3/30, 2:37* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class TableBean implements Writable {private String timeStamp;private String productId;private String orderId;private String productName;private String flag;public TableBean() {}public String getTimeStamp() {return timeStamp;}public void setTimeStamp(String timeStamp) {this.timeStamp = timeStamp;}public String getProductId() {return productId;}public void setProductId(String productId) {this.productId = productId;}public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getProductName() {return productName;}public void setProductName(String productName) {this.productName = productName;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(timeStamp);out.writeUTF(productId);out.writeUTF(orderId);out.writeUTF(productName);out.writeUTF(flag);}@Overridepublic void readFields(DataInput in) throws IOException {timeStamp = in.readUTF();productId = in.readUTF();orderId = in.readUTF();productName = in.readUTF();flag = in.readUTF();}@Overridepublic String toString() {return timeStamp + "\t" + productName + "\t" + orderId;}
}public class TableMapper extends Mapper<LongWritable, Text,Text,TableBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//通过切片获取文件信息FileSplit split = (FileSplit) context.getInputSplit();String name = split.getPath().getName();//获取一行数据、定义TableBean对象String line = value.toString();TableBean tb = new TableBean();Text t = new Text();//判断是哪一张表if (name.contains("order.txt")){String[] fields = line.split("\t");tb.setTimeStamp(fields[0]);tb.setProductId(fields[1]);tb.setOrderId(fields[2]);tb.setProductName("");tb.setFlag("0");t.set(fields[1]);}else {String[] fields = line.split("\t");tb.setTimeStamp("");tb.setProductId(fields[0]);tb.setOrderId("");tb.setProductName(fields[1]);tb.setFlag("1");t.set(fields[0]);}context.write(t,tb);}
}public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {//分别创建用来存储订单表和产品表的集合ArrayList<TableBean> orderBean = new ArrayList<>();TableBean productBean = new TableBean();//遍历values,通过flag判断是产品表还是订单表for (TableBean v:values){if (v.getFlag().equals("0")){TableBean tableBean = new TableBean();try {BeanUtils.copyProperties(tableBean,v);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}orderBean.add(tableBean);}else {try {BeanUtils.copyProperties(productBean,v);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//拼接表for (TableBean ob:orderBean) {ob.setProductName(productBean.getProductName());context.write(ob,NullWritable.get());}}
}public class TableDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//jar包job.setJarByClass(TableDriver.class);//Mapper、Reducerjob.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);//Mapper输出数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);//Reducer输出数据类型job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);//输入输出路径FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\reducejoin\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\reducejoin\\out"));//提交任务if (job.waitForCompletion(true)){System.out.println("运行完成!");}else {System.out.println("运行失败!");}}
}

  

 

转载于:https://www.cnblogs.com/HelloBigTable/p/10668306.html

这篇关于Mapjoin和Reducejoin案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/491433

相关文章

Python中使用正则表达式精准匹配IP地址的案例

《Python中使用正则表达式精准匹配IP地址的案例》Python的正则表达式(re模块)是完成这个任务的利器,但你知道怎么写才能准确匹配各种合法的IP地址吗,今天我们就来详细探讨这个问题,感兴趣的朋... 目录为什么需要IP正则表达式?IP地址的基本结构基础正则表达式写法精确匹配0-255的数字验证IP地

MySQL高级查询之JOIN、子查询、窗口函数实际案例

《MySQL高级查询之JOIN、子查询、窗口函数实际案例》:本文主要介绍MySQL高级查询之JOIN、子查询、窗口函数实际案例的相关资料,JOIN用于多表关联查询,子查询用于数据筛选和过滤,窗口函... 目录前言1. JOIN(连接查询)1.1 内连接(INNER JOIN)1.2 左连接(LEFT JOI

springboot循环依赖问题案例代码及解决办法

《springboot循环依赖问题案例代码及解决办法》在SpringBoot中,如果两个或多个Bean之间存在循环依赖(即BeanA依赖BeanB,而BeanB又依赖BeanA),会导致Spring的... 目录1. 什么是循环依赖?2. 循环依赖的场景案例3. 解决循环依赖的常见方法方法 1:使用 @La

MySQL中实现多表查询的操作方法(配sql+实操图+案例巩固 通俗易懂版)

《MySQL中实现多表查询的操作方法(配sql+实操图+案例巩固通俗易懂版)》本文主要讲解了MySQL中的多表查询,包括子查询、笛卡尔积、自连接、多表查询的实现方法以及多列子查询等,通过实际例子和操... 目录复合查询1. 回顾查询基本操作group by 分组having1. 显示部门号为10的部门名,员

Python爬虫selenium验证之中文识别点选+图片验证码案例(最新推荐)

《Python爬虫selenium验证之中文识别点选+图片验证码案例(最新推荐)》本文介绍了如何使用Python和Selenium结合ddddocr库实现图片验证码的识别和点击功能,感兴趣的朋友一起看... 目录1.获取图片2.目标识别3.背景坐标识别3.1 ddddocr3.2 打码平台4.坐标点击5.图

使用Navicat工具比对两个数据库所有表结构的差异案例详解

《使用Navicat工具比对两个数据库所有表结构的差异案例详解》:本文主要介绍如何使用Navicat工具对比两个数据库test_old和test_new,并生成相应的DDLSQL语句,以便将te... 目录概要案例一、如图两个数据库test_old和test_new进行比较:二、开始比较总结概要公司存在多

SpringBoot实现动态插拔的AOP的完整案例

《SpringBoot实现动态插拔的AOP的完整案例》在现代软件开发中,面向切面编程(AOP)是一种非常重要的技术,能够有效实现日志记录、安全控制、性能监控等横切关注点的分离,在传统的AOP实现中,切... 目录引言一、AOP 概述1.1 什么是 AOP1.2 AOP 的典型应用场景1.3 为什么需要动态插

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

MySQL不使用子查询的原因及优化案例

《MySQL不使用子查询的原因及优化案例》对于mysql,不推荐使用子查询,效率太差,执行子查询时,MYSQL需要创建临时表,查询完毕后再删除这些临时表,所以,子查询的速度会受到一定的影响,本文给大家... 目录不推荐使用子查询和JOIN的原因解决方案优化案例案例1:查询所有有库存的商品信息案例2:使用EX

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD