大数据技术之_05_Hadoop学习_03_MapReduce_MapTask工作机制+ReduceTask工作机制+OutputFormat数据输出+Join多种应用+计数器应用+数据清洗(ETL)

本文主要是介绍大数据技术之_05_Hadoop学习_03_MapReduce_MapTask工作机制+ReduceTask工作机制+OutputFormat数据输出+Join多种应用+计数器应用+数据清洗(ETL),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据技术之_05_Hadoop学习_03_MapReduce

        • 3.3.4 WritableComparable排序
        • 3.3.5 WritableComparable排序案例实操(全排序)
        • 3.3.6 WritableComparable排序案例实操(区内排序)
        • 3.3.7 Combiner合并
        • 3.3.8 Combiner合并案例实操
        • 3.3.9 GroupingComparator分组(辅助排序/分组排序)
        • 3.3.10 GroupingComparator分组案例实操
      • 3.4 MapTask工作机制
      • 3.5 ReduceTask工作机制
      • 3.6 OutputFormat数据输出
        • 3.6.1 OutputFormat接口实现类
        • 3.6.2 自定义OutputFormat
        • 3.6.3 自定义OutputFormat案例实操
      • 3.7 Join多种应用
        • 3.7.1 Reduce Join
        • 3.7.2 Reduce Join案例实操
        • 3.7.3 Map Join
        • 3.7.4 Map Join案例实操
      • 3.8 计数器应用
      • 3.9 数据清洗(ETL)
        • 3.9.1 数据清洗案例实操-简单解析版
        • 3.9.2 数据清洗案例实操-复杂解析版
        • 3.10 MapReduce开发总结
    • 第4章 Hadoop数据压缩
      • 4.1 概述
      • 4.2 MR支持的压缩编码
      • 4.3 压缩方式选择
        • 4.3.1 Gzip压缩
        • 4.3.2 Bzip2压缩
        • 4.3.3 Lzo压缩
        • 4.3.4 Snappy压缩
      • 4.4 压缩位置选择
      • 4.5 压缩参数配置
      • 4.6 压缩实操案例
        • 4.6.1 数据流的压缩和解压缩
        • 4.6.2 Map输出端采用压缩
        • 4.6.3 Reduce输出端采用压缩
    • 第5章 Yarn资源调度器
      • 5.1 Yarn基本架构
      • 5.3 Yarn工作机制
      • 5.4 作业提交全过程
      • 5.5 资源调度器
      • 5.6 任务的推测执行(秘籍)

3.3.4 WritableComparable排序

0、排序概述


1、排序的分类

2、自定义排序WritableComparable
(1)原理分析
  bean对象做为key传输,需要实现WritableComparable接口重写compareTo()方法,就可以实现排序。

@Override
public int compareTo(FlowBean o) {int result;// 按照总流量大小,倒序排列if (sumFlow > bean.getSumFlow()) {result = -1;} else if (sumFlow < bean.getSumFlow()) {result = 1;} else {result = 0;}return result;
}
3.3.5 WritableComparable排序案例实操(全排序)

1、需求
  根据案例2.3产生的结果再次对总流量进行排序。
(1)输入数据

(2)期望输出数据

13509468723     7335	110349	117684
13736230513     2481	24681	27162
13956435636     132		1512	1644
13846544121     264		0		264
......

2、需求分析

3、代码实现
(1)FlowBean对象在在需求1基础上增加了比较功能

package com.atguigu.mr.sort;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean> {private long upFlow; // 上行流量private long downFlow; // 下行流量private long sumFlow; // 总流量/*** 反序列化时,需要反射调用空参构造函数,所以必须有*/public FlowBean() {super();}public FlowBean(long upFlow, long downFlow) {super();this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}/*** 反序列化方法,注意反序列化的顺序和序列化的顺序完全一致*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}/*** 比较方法*/@Overridepublic int compareTo(FlowBean bean) {int result;// 按照总流量大小,倒序排列if (sumFlow > bean.getSumFlow()) {result = -1;} else if (sumFlow < bean.getSumFlow()) {result = 1;} else {result = 0;}return result;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}
}

(2)编写Mapper类

package com.atguigu.mr.sort;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {FlowBean k = new FlowBean();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 13736230513	2481	24681	27162// 1、获取一行String line = value.toString();// 2、截取String[] fields = line.split("\t");// 3、封装对象String phoneNum = fields[0];long upFlow = Long.parseLong(fields[1]);long downFlow = Long.parseLong(fields[2]);long sumFlow = Long.parseLong(fields[3]);k.setUpFlow(upFlow);k.setDownFlow(downFlow);k.setSumFlow(sumFlow);v.set(phoneNum);// 4、输出context.write(k, v);}
}

(3)编写Reducer类

package com.atguigu.mr.sort;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {// 循环输出,避免总流量相同的情况for (Text text : values) {context.write(text, key);}}
}

(4)编写Driver类

package com.atguigu.mr.sort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCountSortDriver {public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "d:/temp/atguigu/0529/output2", "d:/temp/atguigu/0529/output8" };// 1、获取配置信息,或者job对象实例Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2、指定本程序的jar包所在的本地路径job.setJarByClass(FlowCountSortDriver.class);// 3、指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowCountSortMapper.class);job.setReducerClass(FlowCountSortReducer.class);// 4、指定mapper输出数据的kv类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 5、指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 6、指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7、将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
3.3.6 WritableComparable排序案例实操(区内排序)

1、需求
  要求每个省份手机号输出的文件中按照总流量内部排序。
2、需求分析
  基于前一个需求,增加自定义分区类,分区按照省份手机号设置。

3、案例实操
(1)增加自定义分区类

package com.atguigu.mr.sort;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner<FlowBean, Text> {@Overridepublic int getPartition(FlowBean key, Text value, int numPartitions) {// 按照手机号的前三位进行分区// 获取手机号的前三位String prePhoneNum = value.toString().substring(0, 3);// 根据手机号归属地设置分区int partition = 4;if ("136".equals(prePhoneNum)) {partition = 0;} else if ("137".equals(prePhoneNum)) {partition = 1;} else if ("138".equals(prePhoneNum)) {partition = 2;} else if ("139".equals(prePhoneNum)) {partition = 3;}return partition;}
}

(2)在驱动类中添加加载分区类

    // 加载自定义分区类(即关联分区)job.setPartitionerClass(ProvincePartitioner.class);// 设置Reducetask个数job.setNumReduceTasks(5);
3.3.7 Combiner合并

  Combiner合并是Hadoop框架优化的一种手段,因为Combiner合并减少了数据的IO传输。

(6)自定义Combiner实现步骤
(a)自定义一个Combiner继承Reducer,重写reduce()方法

package com.atguigu.mr.wordcount;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {int sum;IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 1、汇总,累加求和sum = 0;for (IntWritable value : values) {sum += value.get();}v.set(sum);// 2、写出context.write(key, v);}
}

(b)在Job驱动类中设置:

    job.setCombinerClass(WordcountCombiner.class);
3.3.8 Combiner合并案例实操

1、需求
  统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。
(1)数据输入

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

(2)期望输出数据
  期望:Combine输入数据多,输出时经过合并,输出数据降低。
2、需求分析

3、案例实操-方案一
1)增加一个WordcountCombiner类继承Reducer

package com.atguigu.mr.wordcount;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {int sum;IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 1、汇总,累加求和sum = 0;for (IntWritable value : values) {sum += value.get();}v.set(sum);// 2、写出context.write(key, v);}
}

2)在WordcountDriver驱动类中指定Combiner

    // 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑job.setCombinerClass(WordcountCombiner.class);

4、案例实操-方案二
1)将WordcountReducer作为Combiner在WordcountDriver驱动类中指定

    // 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑job.setCombinerClass(WordcountReducer.class);

运行程序,如下图所示:

3.3.9 GroupingComparator分组(辅助排序/分组排序)

  对Reduce阶段的数据根据某一个或几个字段进行分组。
分组排序步骤:
(1)自定义类继承WritableComparator
(2)重写compare()方法

@Override
public int compare(WritableComparable a, WritableComparable b) {// 比较的业务逻辑// ......return result;
}

(3)创建一个构造将比较对象的类传给父类

protected OrderGroupingComparator() {super(OrderBean.class, true);
}
3.3.10 GroupingComparator分组案例实操

1、需求
  有如下订单数据

  现在需要求出每一个订单中最贵的商品。
(1)输入数据
GroupingComparator.txt

0000001	Pdt_01	222.8
0000002	Pdt_05	722.4
0000001	Pdt_02	33.8
0000003	Pdt_06	232.8
0000003	Pdt_02	33.8
0000002	Pdt_03	522.8
0000002	Pdt_04	122.4

(2)期望输出数据

1	222.8
2	722.4
3	232.8

2、需求分析
(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。
(2)在Reduce端利用GroupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如下图所示。

3、代码实现
(1)定义订单信息OrderBean类

package com.atguigu.mr.order;import java.io.DataInput;

这篇关于大数据技术之_05_Hadoop学习_03_MapReduce_MapTask工作机制+ReduceTask工作机制+OutputFormat数据输出+Join多种应用+计数器应用+数据清洗(ETL)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1100803

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na