大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件

本文主要是介绍大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、原理:

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?

InputFormat其实是一个接口,包含了两个方法:

public interface InputFormat<K, V> {InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)  throws IOException;}

这两个方法有分别完成着以下工作:

      方法 getSplits 将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M

     方法 getRecordReader 将每个 split  解析成records, 再依次将record解析成<K,V>对

也就是说 InputFormat完成以下工作:   InputFile -->  splits  -->  <K,V>

(一)系统常用的  InputFormat 又有哪些呢?

                      

其中Text InputFormat便是最常用的,它的 <K,V>就代表 <行偏移,该行内容>

(二)自定义   InputFormat 

 定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

然而系统所提供的这几种固定的将  InputFile转换为 <K,V>的方式有时候并不能满足我们的需求:

此时需要我们自定义   InputFormat ,从而使Hadoop框架按照我们预设的方式来将 InputFile解析为<K,V>

在领会自定义   InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),

  •       FileInputFormat implements  InputFormat
  •       TextInputFormat extends  FileInputFormat

RecordReader (interface), Line RecordReader(class)的关系

  •       TextInputFormat.get RecordReader calls  Line RecordReader
  •       Line RecordReader  implements  RecordReader

对于InputFormat接口,上面已经有详细的描述

(三)FileInputFormat

再看看 FileInputFormat,

它实现了 InputFormat接口中的 getSplits方法,而将 getRecordReader与isSplitable留给具体类(如 TextInputFormat )实现, isSplitable方法通常不用修改,所以只需要在自定义的 InputFormat中实现

getRecordReader方法即可,而该方法的核心是调用 Line RecordReader(即由LineRecorderReader类来实现 " 将每个s plit解析成records, 再依次将record解析成<K,V>对" ),该方法实现了接口RecordReader

  public interface RecordReader<K, V> {boolean   next(K key, V value) throws IOException;K   createKey();V   createValue();long   getPos() throws IOException;public void   close() throws IOException;float   getProgress() throws IOException;
}

 

    因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

     定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

 

二、代码:

 

package MyInputFormat;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {@SuppressWarnings("deprecation")@Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {return new TrackRecordReader();}@Overrideprotected boolean isSplitable(JobContext context, Path file) {CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);return codec == null;}}

 

 

package MyInputFormat;import java.io.IOException;
import java.io.InputStream;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** Treats keys as offset in file and value as line.* * @deprecated Use*             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}*             instead.*/
public class TrackRecordReader extends RecordReader<LongWritable, Text> {private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private NewLineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;// ----------------------// 行分隔符,即一条记录的分隔符private byte[] separator = "END\n".getBytes();// --------------------public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());boolean skipFirstLine = false;if (codec != null) {in = new NewLineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;this.start -= separator.length;//// --start;fileIn.seek(start);}in = new NewLineReader(fileIn, job);}if (skipFirstLine) { // skip first line and re-establish "start".start += in.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;while (pos < end) {newSize = in.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0) {break;}pos += newSize;if (newSize < maxLineLength) {break;}LOG.info("Skipped line of size " + newSize + " at pos "+ (pos - newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}@Overridepublic LongWritable getCurrentKey() {return key;}@Overridepublic Text getCurrentValue() {return value;}/*** Get the progress within the split*/public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized void close() throws IOException {if (in != null) {in.close();}}public class NewLineReader {private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;private int bufferSize = DEFAULT_BUFFER_SIZE;private InputStream in;private byte[] buffer;private int bufferLength = 0;private int bufferPosn = 0;public NewLineReader(InputStream in) {this(in, DEFAULT_BUFFER_SIZE);}public NewLineReader(InputStream in, int bufferSize) {this.in = in;this.bufferSize = bufferSize;this.buffer = new byte[this.bufferSize];}public NewLineReader(InputStream in, Configuration conf)throws IOException {this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));}public void close() throws IOException {in.close();}public int readLine(Text str, int maxLineLength, int maxBytesToConsume)throws IOException {str.clear();Text record = new Text();int txtLength = 0;long bytesConsumed = 0L;boolean newline = false;int sepPosn = 0;do {// 已经读到buffer的末尾了,读下一个bufferif (this.bufferPosn >= this.bufferLength) {bufferPosn = 0;bufferLength = in.read(buffer);// 读到文件末尾了,则跳出,进行下一个文件的读取if (bufferLength <= 0) {break;}}int startPosn = this.bufferPosn;for (; bufferPosn < bufferLength; bufferPosn++) {// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {sepPosn = 0;}// 遇到行分隔符的第一个字符if (buffer[bufferPosn] == separator[sepPosn]) {bufferPosn++;int i = 0;// 判断接下来的字符是否也是行分隔符中的字符for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半if (bufferPosn + i >= bufferLength) {bufferPosn += i - 1;break;}// 一旦其中有一个字符不相同,就判定为不是分隔符if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {sepPosn = 0;break;}}// 的确遇到了行分隔符if (sepPosn == separator.length) {bufferPosn += i;newline = true;sepPosn = 0;break;}}}int readLength = this.bufferPosn - startPosn;bytesConsumed += readLength;// 行分隔符不放入块中if (readLength > maxLineLength - txtLength) {readLength = maxLineLength - txtLength;}if (readLength > 0) {record.append(this.buffer, startPosn, readLength);txtLength += readLength;// 去掉记录的分隔符if (newline) {str.set(record.getBytes(), 0, record.getLength()- separator.length);}}} while (!newline && (bytesConsumed < maxBytesToConsume));if (bytesConsumed > (long) Integer.MAX_VALUE) {throw new IOException("Too many bytes before newline: "+ bytesConsumed);}return (int) bytesConsumed;}public int readLine(Text str, int maxLineLength) throws IOException {return readLine(str, maxLineLength, Integer.MAX_VALUE);}public int readLine(Text str) throws IOException {return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);}}
}

 

 

package MyInputFormat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TestMyInputFormat {public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {System.out.println("key:\t " + key);System.out.println("value:\t " + value);System.out.println("-------------------------");}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Path outPath = new Path("/hive/11");FileSystem.get(conf).delete(outPath, true);Job job = new Job(conf, "TestMyInputFormat");job.setInputFormatClass(TrackInputFormat.class);job.setJarByClass(TestMyInputFormat.class);job.setMapperClass(TestMyInputFormat.MapperClass.class);job.setNumReduceTasks(0);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

 

三、测试数据:

  cookieId    time     url                 cookieOverFlag

 

1       a        1_hao123
1       a        1_baidu
1       b        1_google       2END
2       c        2_google
2       c        2_hao123
2       c        2_google       1END
3       a        3_baidu
3       a        3_sougou
3       b        3_soso         2END

 

四、结果:

 

key:	 0
value:	 1	a	1_hao123	
1	a	 1_baidu	
1	b	 1_google	2
-------------------------
key:	 47
value:	 2	c	 2_google	
2	c	 2_hao123	
2	c	 2_google	1
-------------------------
key:	 96
value:	 3	a	 3_baidu	
3	a	 3_sougou	
3	b	 3_soso	2
-------------------------

这篇关于大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/727285

相关文章

javaScript在表单提交时获取表单数据的示例代码

《javaScript在表单提交时获取表单数据的示例代码》本文介绍了五种在JavaScript中获取表单数据的方法:使用FormData对象、手动提取表单数据、使用querySelector获取单个字... 方法 1:使用 FormData 对象FormData 是一个方便的内置对象,用于获取表单中的键值

Rust中的BoxT之堆上的数据与递归类型详解

《Rust中的BoxT之堆上的数据与递归类型详解》本文介绍了Rust中的BoxT类型,包括其在堆与栈之间的内存分配,性能优势,以及如何利用BoxT来实现递归类型和处理大小未知类型,通过BoxT,Rus... 目录1. Box<T> 的基础知识1.1 堆与栈的分工1.2 性能优势2.1 递归类型的问题2.2

Python使用Pandas对比两列数据取最大值的五种方法

《Python使用Pandas对比两列数据取最大值的五种方法》本文主要介绍使用Pandas对比两列数据取最大值的五种方法,包括使用max方法、apply方法结合lambda函数、函数、clip方法、w... 目录引言一、使用max方法二、使用apply方法结合lambda函数三、使用np.maximum函数

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate