2.2.11 hadoop体系之离线计算-mapreduce分布式计算-案例:Reduce端实现Join

本文主要是介绍2.2.11 hadoop体系之离线计算-mapreduce分布式计算-案例:Reduce端实现Join,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.需求分析

2.数据展示

3.实现机制

3.1 ReduceJoinMapper:定义Mapper

3.2 ReduceJoinReducer:定义Reducer

3.3 JobMain:定义Main方法

4.运行并查看结果

4.1 准备数据

4.2 运行结果


1.需求分析

假如数据量巨大,两表的数据是以文件的形式存储在 HDFS 中, 需要用 MapReduce 程 序来实现以下 SQL 查询运算

select a.id,a.date,b.name,b.category_id,b.price from t_order a left
join t_product b on a.pid = b.id

2.数据展示

3.实现机制

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联。

代码结构:

 

3.1 ReduceJoinMapper:定义Mapper

package ucas.mapreduce_reduce_join;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//首先判断数据来自哪个文件FileSplit fileSplit = (FileSplit) context.getInputSplit();String fileName = fileSplit.getPath().getName();if(fileName.equals("orders.txt")){//获取pidString[] split = value.toString().split(",");context.write(new Text(split[2]), value);}else{//获取pidString[] split = value.toString().split(",");context.write(new Text(split[0]), value);}}
}

3.2 ReduceJoinReducer:定义Reducer

package ucas.mapreduce_reduce_join;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ReduceJoinReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {String first = "";String second = "";for (Text value : values) {if (value.toString().startsWith("p")) {first = value.toString();} else {second = value.toString();}}if (first.equals("")) {context.write(key, new Text("NULL" + "\t" + second));} else {context.write(key, new Text(first + "\t" + second));}}
}

3.3 JobMain:定义Main方法

package ucas.mapreduce_reduce_join;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class JobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {//创建一个任务对象Job job = Job.getInstance(super.getConf(), "mapreduce_reduce_join");//打包放在集群运行时,需要做一个配置job.setJarByClass(JobMain.class);//第一步:设置读取文件的类: K1 和V1job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/reduce_join"));//第二步:设置Mapper类job.setMapperClass(ReduceJoinMapper.class);//设置Map阶段的输出类型: k2 和V2的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//第三,四,五,六步采用默认方式(分区,排序,规约,分组)//第七步 :设置文的Reducer类job.setReducerClass(ReduceJoinReducer.class);//设置Reduce阶段的输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//第八步:设置输出类job.setOutputFormatClass(TextOutputFormat.class);//设置输出的路径TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/out/reduce_join_out"));boolean b = job.waitForCompletion(true);return b ? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//启动一个任务int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}}

4.运行并查看结果

4.1 准备数据

4.2 运行结果

运行命令:hadoop jar day04_mapreduce_combiner-1.0-SNAPSHOT.jar ucas.mapreduce_reduce_join.JobMain

可以看到,我们实现了联合查询操作,还是比较简单的。

这篇关于2.2.11 hadoop体系之离线计算-mapreduce分布式计算-案例:Reduce端实现Join的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754520

相关文章

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time