大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件

本文主要是介绍大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、原理:

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?

InputFormat其实是一个接口,包含了两个方法:

public interface InputFormat<K, V> {InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)  throws IOException;}

这两个方法有分别完成着以下工作:

      方法 getSplits 将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M

     方法 getRecordReader 将每个 split  解析成records, 再依次将record解析成<K,V>对

也就是说 InputFormat完成以下工作:   InputFile -->  splits  -->  <K,V>

(一)系统常用的  InputFormat 又有哪些呢?

                      

其中Text InputFormat便是最常用的,它的 <K,V>就代表 <行偏移,该行内容>

(二)自定义   InputFormat 

 定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

然而系统所提供的这几种固定的将  InputFile转换为 <K,V>的方式有时候并不能满足我们的需求:

此时需要我们自定义   InputFormat ,从而使Hadoop框架按照我们预设的方式来将 InputFile解析为<K,V>

在领会自定义   InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),

  •       FileInputFormat implements  InputFormat
  •       TextInputFormat extends  FileInputFormat

RecordReader (interface), Line RecordReader(class)的关系

  •       TextInputFormat.get RecordReader calls  Line RecordReader
  •       Line RecordReader  implements  RecordReader

对于InputFormat接口,上面已经有详细的描述

(三)FileInputFormat

再看看 FileInputFormat,

它实现了 InputFormat接口中的 getSplits方法,而将 getRecordReader与isSplitable留给具体类(如 TextInputFormat )实现, isSplitable方法通常不用修改,所以只需要在自定义的 InputFormat中实现

getRecordReader方法即可,而该方法的核心是调用 Line RecordReader(即由LineRecorderReader类来实现 " 将每个s plit解析成records, 再依次将record解析成<K,V>对" ),该方法实现了接口RecordReader

  public interface RecordReader<K, V> {boolean   next(K key, V value) throws IOException;K   createKey();V   createValue();long   getPos() throws IOException;public void   close() throws IOException;float   getProgress() throws IOException;
}

 

    因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

     定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

 

二、代码:

 

package MyInputFormat;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {@SuppressWarnings("deprecation")@Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {return new TrackRecordReader();}@Overrideprotected boolean isSplitable(JobContext context, Path file) {CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);return codec == null;}}

 

 

package MyInputFormat;import java.io.IOException;
import java.io.InputStream;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** Treats keys as offset in file and value as line.* * @deprecated Use*             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}*             instead.*/
public class TrackRecordReader extends RecordReader<LongWritable, Text> {private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private NewLineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;// ----------------------// 行分隔符,即一条记录的分隔符private byte[] separator = "END\n".getBytes();// --------------------public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());boolean skipFirstLine = false;if (codec != null) {in = new NewLineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;this.start -= separator.length;//// --start;fileIn.seek(start);}in = new NewLineReader(fileIn, job);}if (skipFirstLine) { // skip first line and re-establish "start".start += in.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;while (pos < end) {newSize = in.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0) {break;}pos += newSize;if (newSize < maxLineLength) {break;}LOG.info("Skipped line of size " + newSize + " at pos "+ (pos - newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}@Overridepublic LongWritable getCurrentKey() {return key;}@Overridepublic Text getCurrentValue() {return value;}/*** Get the progress within the split*/public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized void close() throws IOException {if (in != null) {in.close();}}public class NewLineReader {private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;private int bufferSize = DEFAULT_BUFFER_SIZE;private InputStream in;private byte[] buffer;private int bufferLength = 0;private int bufferPosn = 0;public NewLineReader(InputStream in) {this(in, DEFAULT_BUFFER_SIZE);}public NewLineReader(InputStream in, int bufferSize) {this.in = in;this.bufferSize = bufferSize;this.buffer = new byte[this.bufferSize];}public NewLineReader(InputStream in, Configuration conf)throws IOException {this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));}public void close() throws IOException {in.close();}public int readLine(Text str, int maxLineLength, int maxBytesToConsume)throws IOException {str.clear();Text record = new Text();int txtLength = 0;long bytesConsumed = 0L;boolean newline = false;int sepPosn = 0;do {// 已经读到buffer的末尾了,读下一个bufferif (this.bufferPosn >= this.bufferLength) {bufferPosn = 0;bufferLength = in.read(buffer);// 读到文件末尾了,则跳出,进行下一个文件的读取if (bufferLength <= 0) {break;}}int startPosn = this.bufferPosn;for (; bufferPosn < bufferLength; bufferPosn++) {// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {sepPosn = 0;}// 遇到行分隔符的第一个字符if (buffer[bufferPosn] == separator[sepPosn]) {bufferPosn++;int i = 0;// 判断接下来的字符是否也是行分隔符中的字符for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半if (bufferPosn + i >= bufferLength) {bufferPosn += i - 1;break;}// 一旦其中有一个字符不相同,就判定为不是分隔符if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {sepPosn = 0;break;}}// 的确遇到了行分隔符if (sepPosn == separator.length) {bufferPosn += i;newline = true;sepPosn = 0;break;}}}int readLength = this.bufferPosn - startPosn;bytesConsumed += readLength;// 行分隔符不放入块中if (readLength > maxLineLength - txtLength) {readLength = maxLineLength - txtLength;}if (readLength > 0) {record.append(this.buffer, startPosn, readLength);txtLength += readLength;// 去掉记录的分隔符if (newline) {str.set(record.getBytes(), 0, record.getLength()- separator.length);}}} while (!newline && (bytesConsumed < maxBytesToConsume));if (bytesConsumed > (long) Integer.MAX_VALUE) {throw new IOException("Too many bytes before newline: "+ bytesConsumed);}return (int) bytesConsumed;}public int readLine(Text str, int maxLineLength) throws IOException {return readLine(str, maxLineLength, Integer.MAX_VALUE);}public int readLine(Text str) throws IOException {return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);}}
}

 

 

package MyInputFormat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TestMyInputFormat {public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {System.out.println("key:\t " + key);System.out.println("value:\t " + value);System.out.println("-------------------------");}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Path outPath = new Path("/hive/11");FileSystem.get(conf).delete(outPath, true);Job job = new Job(conf, "TestMyInputFormat");job.setInputFormatClass(TrackInputFormat.class);job.setJarByClass(TestMyInputFormat.class);job.setMapperClass(TestMyInputFormat.MapperClass.class);job.setNumReduceTasks(0);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

 

三、测试数据:

  cookieId    time     url                 cookieOverFlag

 

1       a        1_hao123
1       a        1_baidu
1       b        1_google       2END
2       c        2_google
2       c        2_hao123
2       c        2_google       1END
3       a        3_baidu
3       a        3_sougou
3       b        3_soso         2END

 

四、结果:

 

key:	 0
value:	 1	a	1_hao123	
1	a	 1_baidu	
1	b	 1_google	2
-------------------------
key:	 47
value:	 2	c	 2_google	
2	c	 2_hao123	
2	c	 2_google	1
-------------------------
key:	 96
value:	 3	a	 3_baidu	
3	a	 3_sougou	
3	b	 3_soso	2
-------------------------

这篇关于大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/727285

相关文章

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

SpringValidation数据校验之约束注解与分组校验方式

《SpringValidation数据校验之约束注解与分组校验方式》本文将深入探讨SpringValidation的核心功能,帮助开发者掌握约束注解的使用技巧和分组校验的高级应用,从而构建更加健壮和可... 目录引言一、Spring Validation基础架构1.1 jsR-380标准与Spring整合1

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2

SpringBatch数据写入实现

《SpringBatch数据写入实现》SpringBatch通过ItemWriter接口及其丰富的实现,提供了强大的数据写入能力,本文主要介绍了SpringBatch数据写入实现,具有一定的参考价值,... 目录python引言一、ItemWriter核心概念二、数据库写入实现三、文件写入实现四、多目标写入

使用Python将JSON,XML和YAML数据写入Excel文件

《使用Python将JSON,XML和YAML数据写入Excel文件》JSON、XML和YAML作为主流结构化数据格式,因其层次化表达能力和跨平台兼容性,已成为系统间数据交换的通用载体,本文将介绍如何... 目录如何使用python写入数据到Excel工作表用Python导入jsON数据到Excel工作表用

Mysql如何将数据按照年月分组的统计

《Mysql如何将数据按照年月分组的统计》:本文主要介绍Mysql如何将数据按照年月分组的统计方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql将数据按照年月分组的统计要的效果方案总结Mysql将数据按照年月分组的统计要的效果方案① 使用 DA

鸿蒙中Axios数据请求的封装和配置方法

《鸿蒙中Axios数据请求的封装和配置方法》:本文主要介绍鸿蒙中Axios数据请求的封装和配置方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1.配置权限 应用级权限和系统级权限2.配置网络请求的代码3.下载在Entry中 下载AxIOS4.封装Htt

使用Python实现一键隐藏屏幕并锁定输入

《使用Python实现一键隐藏屏幕并锁定输入》本文主要介绍了使用Python编写一个一键隐藏屏幕并锁定输入的黑科技程序,能够在指定热键触发后立即遮挡屏幕,并禁止一切键盘鼠标输入,这样就再也不用担心自己... 目录1. 概述2. 功能亮点3.代码实现4.使用方法5. 展示效果6. 代码优化与拓展7. 总结1.

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,