【Hadoop】5.MapReduce框架原理-自定义InputFormat

2023-11-09 13:32

本文主要是介绍【Hadoop】5.MapReduce框架原理-自定义InputFormat,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

自定义InputFormat一般应用于hadoop自带的InputFormat类型不能满足某个应用场景中,需要我们自定义来解决

自定义步骤

自定义Inputformat步骤如下:

  1. 自定义一个类继承InputFormat
  2. 改写RecordReader,实现一次读取一个完成的文件封装为KV
  3. 在输出时使用SequenceFileOutPutFormat输出合并文件SequenceFile文件

(SequenceFile文件是hadoop用来存储二进制形式的key-value对的文件格式,SequenceFile里面存储着很多小文件,存储的形式为文件路径+名称为key,文件内容为value)。

示例操作

步骤:

  1. 自定义一个类继承FileInputFormat
    a. 重写isSpliable()方法,返回false 设置为文件不可分割
    b. 重写createRecordReader(),创建自定义的RecordReader对象,并初始化
  2. 改写RecordReader,实现一次读取一个完整的文件封装成为KV
    a. 采用IO流一次读取一个文件输出到value中,因为设置了不可分割所以整个文件都封装到了value中
    b. 获取文件路径信息+名称作为key
  3. 设置Driver
    a.

代码:
在这里插入图片描述

CustomerInputFormate自定义InputFormat
package com.xing.MapReduce.InputFormatSequenceFile;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/***  自定义InputFormat*/
public class CustomerInputFormate extends FileInputFormat<Text,BytesWritable> {/***  新建自定义的RecordReader* @param inputSplit* @param taskAttemptContext* @return* @throws IOException* @throws InterruptedException*/public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {CustomerRecordReader reader = new CustomerRecordReader();reader.initialize(inputSplit,taskAttemptContext );return reader;}/***  设置为不可分割* @param context* @param filename* @return*/@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}
}
CustomerRecordReader 自定义RecordReader
package com.xing.MapReduce.InputFormatSequenceFile;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/***  自定义RecordReader*/
public class CustomerRecordReader extends RecordReader<Text, BytesWritable> {// 分片信息FileSplit split;Configuration configuration;Text k = new Text();BytesWritable v = new BytesWritable();boolean isProgress = true;//初始化方法public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {this.split = (FileSplit) inputSplit;this.configuration = taskAttemptContext.getConfiguration();}//核心业务处理(对key和value的封装)public boolean nextKeyValue() throws IOException, InterruptedException {if (isProgress){//1. 获取fsPath path = split.getPath();System.out.println("@@@@@@@@@@@@@@@@"+split.getLength());System.out.println("$$$$$$$$$$$$$$$$"+split);FileSystem fileSystem = path.getFileSystem(configuration);//2. 获取输入流FSDataInputStream fis = fileSystem.open(path);//3. 拷贝byte[] bytes = new byte[(int) split.getLength()];IOUtils.readFully(fis, bytes,0 ,bytes.length);//4. 填充k-vv.set(bytes,0 ,bytes.length);k.set(path.toString());//5. 收尾IOUtils.closeStream(fis);isProgress = false;return true;}return false;}// 获取kay值public Text getCurrentKey() throws IOException, InterruptedException {return k;}// 获取value值public BytesWritable getCurrentValue() throws IOException, InterruptedException {return v;}public float getProgress() throws IOException, InterruptedException {return 0;}public void close() throws IOException {}
}
SequenceFileMapper
package com.xing.MapReduce.InputFormatSequenceFile;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class SequenceFileMapper extends Mapper<Text,BytesWritable,Text,BytesWritable> {@Overrideprotected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {// 直接输出 这里的key就是文件路径信息 value就是文本内容context.write(key,value );}}
SequenceFileReducer
package com.xing.MapReduce.InputFormatSequenceFile;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class SequenceFileReducer extends Reducer<Text,BytesWritable,Text,BytesWritable> {@Overrideprotected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {// 就是简单的输出内容for (BytesWritable value : values) {context.write(key,value);}}
}
SequenceFileDriver
package com.xing.MapReduce.InputFormatSequenceFile;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;public class SequenceFileDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 获取configuration和FileSystemConfiguration configuration = new Configuration();FileSystem fs = FileSystem.get(configuration);// 设置job的名字和jar包Job job = Job.getInstance(configuration);job.setJobName("SequenceFileDriver");job.setJarByClass(SequenceFileDriver.class);// 设置job的mapper和reduce的处理类名job.setMapperClass(SequenceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);// 设置输入类型和输出类型job.setInputFormatClass(CustomerInputFormate.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);// 设置输出的key和输出的value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);// 判断输出地址是否存在if (fs.exists(new Path("E:\\hdfs\\output1"))){fs.delete(new Path("E:\\hdfs\\output1"),true );}// 设置输入和输出的文件路径FileInputFormat.setInputPaths(job,new Path("E:\\hdfs\\input"));FileOutputFormat.setOutputPath(job,new Path("E:\\hdfs\\output1"));// 返回结果boolean b = job.waitForCompletion(true);System.exit(b ? 0 : -1);}
}
输入和输出

输入:
在这里插入图片描述
输出:

在这里插入图片描述

这篇关于【Hadoop】5.MapReduce框架原理-自定义InputFormat的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/376420

相关文章

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个?

跨平台系列 cross-plateform 跨平台应用程序-01-概览 cross-plateform 跨平台应用程序-02-有哪些主流技术栈? cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个? cross-plateform 跨平台应用程序-04-React Native 介绍 cross-plateform 跨平台应用程序-05-Flutte

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

Spring框架5 - 容器的扩展功能 (ApplicationContext)

private static ApplicationContext applicationContext;static {applicationContext = new ClassPathXmlApplicationContext("bean.xml");} BeanFactory的功能扩展类ApplicationContext进行深度的分析。ApplicationConext与 BeanF