大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件

本文主要是介绍大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、原理:

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?

InputFormat其实是一个接口,包含了两个方法:

public interface InputFormat<K, V> {InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)  throws IOException;}

这两个方法有分别完成着以下工作:

      方法 getSplits 将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M

     方法 getRecordReader 将每个 split  解析成records, 再依次将record解析成<K,V>对

也就是说 InputFormat完成以下工作:   InputFile -->  splits  -->  <K,V>

(一)系统常用的  InputFormat 又有哪些呢?

                      

其中Text InputFormat便是最常用的,它的 <K,V>就代表 <行偏移,该行内容>

(二)自定义   InputFormat 

 定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

然而系统所提供的这几种固定的将  InputFile转换为 <K,V>的方式有时候并不能满足我们的需求:

此时需要我们自定义   InputFormat ,从而使Hadoop框架按照我们预设的方式来将 InputFile解析为<K,V>

在领会自定义   InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),

  •       FileInputFormat implements  InputFormat
  •       TextInputFormat extends  FileInputFormat

RecordReader (interface), Line RecordReader(class)的关系

  •       TextInputFormat.get RecordReader calls  Line RecordReader
  •       Line RecordReader  implements  RecordReader

对于InputFormat接口,上面已经有详细的描述

(三)FileInputFormat

再看看 FileInputFormat,

它实现了 InputFormat接口中的 getSplits方法,而将 getRecordReader与isSplitable留给具体类(如 TextInputFormat )实现, isSplitable方法通常不用修改,所以只需要在自定义的 InputFormat中实现

getRecordReader方法即可,而该方法的核心是调用 Line RecordReader(即由LineRecorderReader类来实现 " 将每个s plit解析成records, 再依次将record解析成<K,V>对" ),该方法实现了接口RecordReader

  public interface RecordReader<K, V> {boolean   next(K key, V value) throws IOException;K   createKey();V   createValue();long   getPos() throws IOException;public void   close() throws IOException;float   getProgress() throws IOException;
}

 

    因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

     定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

 

二、代码:

 

package MyInputFormat;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {@SuppressWarnings("deprecation")@Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {return new TrackRecordReader();}@Overrideprotected boolean isSplitable(JobContext context, Path file) {CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);return codec == null;}}

 

 

package MyInputFormat;import java.io.IOException;
import java.io.InputStream;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** Treats keys as offset in file and value as line.* * @deprecated Use*             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}*             instead.*/
public class TrackRecordReader extends RecordReader<LongWritable, Text> {private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private NewLineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;// ----------------------// 行分隔符,即一条记录的分隔符private byte[] separator = "END\n".getBytes();// --------------------public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());boolean skipFirstLine = false;if (codec != null) {in = new NewLineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;this.start -= separator.length;//// --start;fileIn.seek(start);}in = new NewLineReader(fileIn, job);}if (skipFirstLine) { // skip first line and re-establish "start".start += in.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;while (pos < end) {newSize = in.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0) {break;}pos += newSize;if (newSize < maxLineLength) {break;}LOG.info("Skipped line of size " + newSize + " at pos "+ (pos - newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}@Overridepublic LongWritable getCurrentKey() {return key;}@Overridepublic Text getCurrentValue() {return value;}/*** Get the progress within the split*/public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized void close() throws IOException {if (in != null) {in.close();}}public class NewLineReader {private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;private int bufferSize = DEFAULT_BUFFER_SIZE;private InputStream in;private byte[] buffer;private int bufferLength = 0;private int bufferPosn = 0;public NewLineReader(InputStream in) {this(in, DEFAULT_BUFFER_SIZE);}public NewLineReader(InputStream in, int bufferSize) {this.in = in;this.bufferSize = bufferSize;this.buffer = new byte[this.bufferSize];}public NewLineReader(InputStream in, Configuration conf)throws IOException {this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));}public void close() throws IOException {in.close();}public int readLine(Text str, int maxLineLength, int maxBytesToConsume)throws IOException {str.clear();Text record = new Text();int txtLength = 0;long bytesConsumed = 0L;boolean newline = false;int sepPosn = 0;do {// 已经读到buffer的末尾了,读下一个bufferif (this.bufferPosn >= this.bufferLength) {bufferPosn = 0;bufferLength = in.read(buffer);// 读到文件末尾了,则跳出,进行下一个文件的读取if (bufferLength <= 0) {break;}}int startPosn = this.bufferPosn;for (; bufferPosn < bufferLength; bufferPosn++) {// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {sepPosn = 0;}// 遇到行分隔符的第一个字符if (buffer[bufferPosn] == separator[sepPosn]) {bufferPosn++;int i = 0;// 判断接下来的字符是否也是行分隔符中的字符for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半if (bufferPosn + i >= bufferLength) {bufferPosn += i - 1;break;}// 一旦其中有一个字符不相同,就判定为不是分隔符if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {sepPosn = 0;break;}}// 的确遇到了行分隔符if (sepPosn == separator.length) {bufferPosn += i;newline = true;sepPosn = 0;break;}}}int readLength = this.bufferPosn - startPosn;bytesConsumed += readLength;// 行分隔符不放入块中if (readLength > maxLineLength - txtLength) {readLength = maxLineLength - txtLength;}if (readLength > 0) {record.append(this.buffer, startPosn, readLength);txtLength += readLength;// 去掉记录的分隔符if (newline) {str.set(record.getBytes(), 0, record.getLength()- separator.length);}}} while (!newline && (bytesConsumed < maxBytesToConsume));if (bytesConsumed > (long) Integer.MAX_VALUE) {throw new IOException("Too many bytes before newline: "+ bytesConsumed);}return (int) bytesConsumed;}public int readLine(Text str, int maxLineLength) throws IOException {return readLine(str, maxLineLength, Integer.MAX_VALUE);}public int readLine(Text str) throws IOException {return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);}}
}

 

 

package MyInputFormat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TestMyInputFormat {public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {System.out.println("key:\t " + key);System.out.println("value:\t " + value);System.out.println("-------------------------");}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Path outPath = new Path("/hive/11");FileSystem.get(conf).delete(outPath, true);Job job = new Job(conf, "TestMyInputFormat");job.setInputFormatClass(TrackInputFormat.class);job.setJarByClass(TestMyInputFormat.class);job.setMapperClass(TestMyInputFormat.MapperClass.class);job.setNumReduceTasks(0);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

 

三、测试数据:

  cookieId    time     url                 cookieOverFlag

 

1       a        1_hao123
1       a        1_baidu
1       b        1_google       2END
2       c        2_google
2       c        2_hao123
2       c        2_google       1END
3       a        3_baidu
3       a        3_sougou
3       b        3_soso         2END

 

四、结果:

 

key:	 0
value:	 1	a	1_hao123	
1	a	 1_baidu	
1	b	 1_google	2
-------------------------
key:	 47
value:	 2	c	 2_google	
2	c	 2_hao123	
2	c	 2_google	1
-------------------------
key:	 96
value:	 3	a	 3_baidu	
3	a	 3_sougou	
3	b	 3_soso	2
-------------------------

这篇关于大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/727285

相关文章

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

Java实现自定义table宽高的示例代码

《Java实现自定义table宽高的示例代码》在桌面应用、管理系统乃至报表工具中,表格(JTable)作为最常用的数据展示组件,不仅承载对数据的增删改查,还需要配合布局与视觉需求,而JavaSwing... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam