大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件

本文主要是介绍大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、原理:

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?

InputFormat其实是一个接口,包含了两个方法:

public interface InputFormat<K, V> {InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)  throws IOException;}

这两个方法有分别完成着以下工作:

      方法 getSplits 将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M

     方法 getRecordReader 将每个 split  解析成records, 再依次将record解析成<K,V>对

也就是说 InputFormat完成以下工作:   InputFile -->  splits  -->  <K,V>

(一)系统常用的  InputFormat 又有哪些呢?

                      

其中Text InputFormat便是最常用的,它的 <K,V>就代表 <行偏移,该行内容>

(二)自定义   InputFormat 

 定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

然而系统所提供的这几种固定的将  InputFile转换为 <K,V>的方式有时候并不能满足我们的需求:

此时需要我们自定义   InputFormat ,从而使Hadoop框架按照我们预设的方式来将 InputFile解析为<K,V>

在领会自定义   InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),

  •       FileInputFormat implements  InputFormat
  •       TextInputFormat extends  FileInputFormat

RecordReader (interface), Line RecordReader(class)的关系

  •       TextInputFormat.get RecordReader calls  Line RecordReader
  •       Line RecordReader  implements  RecordReader

对于InputFormat接口,上面已经有详细的描述

(三)FileInputFormat

再看看 FileInputFormat,

它实现了 InputFormat接口中的 getSplits方法,而将 getRecordReader与isSplitable留给具体类(如 TextInputFormat )实现, isSplitable方法通常不用修改,所以只需要在自定义的 InputFormat中实现

getRecordReader方法即可,而该方法的核心是调用 Line RecordReader(即由LineRecorderReader类来实现 " 将每个s plit解析成records, 再依次将record解析成<K,V>对" ),该方法实现了接口RecordReader

  public interface RecordReader<K, V> {boolean   next(K key, V value) throws IOException;K   createKey();V   createValue();long   getPos() throws IOException;public void   close() throws IOException;float   getProgress() throws IOException;
}

 

    因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,

     定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader

 

二、代码:

 

package MyInputFormat;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {@SuppressWarnings("deprecation")@Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {return new TrackRecordReader();}@Overrideprotected boolean isSplitable(JobContext context, Path file) {CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);return codec == null;}}

 

 

package MyInputFormat;import java.io.IOException;
import java.io.InputStream;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;/*** Treats keys as offset in file and value as line.* * @deprecated Use*             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}*             instead.*/
public class TrackRecordReader extends RecordReader<LongWritable, Text> {private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private NewLineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;// ----------------------// 行分隔符,即一条记录的分隔符private byte[] separator = "END\n".getBytes();// --------------------public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());boolean skipFirstLine = false;if (codec != null) {in = new NewLineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;this.start -= separator.length;//// --start;fileIn.seek(start);}in = new NewLineReader(fileIn, job);}if (skipFirstLine) { // skip first line and re-establish "start".start += in.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;while (pos < end) {newSize = in.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0) {break;}pos += newSize;if (newSize < maxLineLength) {break;}LOG.info("Skipped line of size " + newSize + " at pos "+ (pos - newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}@Overridepublic LongWritable getCurrentKey() {return key;}@Overridepublic Text getCurrentValue() {return value;}/*** Get the progress within the split*/public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized void close() throws IOException {if (in != null) {in.close();}}public class NewLineReader {private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;private int bufferSize = DEFAULT_BUFFER_SIZE;private InputStream in;private byte[] buffer;private int bufferLength = 0;private int bufferPosn = 0;public NewLineReader(InputStream in) {this(in, DEFAULT_BUFFER_SIZE);}public NewLineReader(InputStream in, int bufferSize) {this.in = in;this.bufferSize = bufferSize;this.buffer = new byte[this.bufferSize];}public NewLineReader(InputStream in, Configuration conf)throws IOException {this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));}public void close() throws IOException {in.close();}public int readLine(Text str, int maxLineLength, int maxBytesToConsume)throws IOException {str.clear();Text record = new Text();int txtLength = 0;long bytesConsumed = 0L;boolean newline = false;int sepPosn = 0;do {// 已经读到buffer的末尾了,读下一个bufferif (this.bufferPosn >= this.bufferLength) {bufferPosn = 0;bufferLength = in.read(buffer);// 读到文件末尾了,则跳出,进行下一个文件的读取if (bufferLength <= 0) {break;}}int startPosn = this.bufferPosn;for (; bufferPosn < bufferLength; bufferPosn++) {// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {sepPosn = 0;}// 遇到行分隔符的第一个字符if (buffer[bufferPosn] == separator[sepPosn]) {bufferPosn++;int i = 0;// 判断接下来的字符是否也是行分隔符中的字符for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半if (bufferPosn + i >= bufferLength) {bufferPosn += i - 1;break;}// 一旦其中有一个字符不相同,就判定为不是分隔符if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {sepPosn = 0;break;}}// 的确遇到了行分隔符if (sepPosn == separator.length) {bufferPosn += i;newline = true;sepPosn = 0;break;}}}int readLength = this.bufferPosn - startPosn;bytesConsumed += readLength;// 行分隔符不放入块中if (readLength > maxLineLength - txtLength) {readLength = maxLineLength - txtLength;}if (readLength > 0) {record.append(this.buffer, startPosn, readLength);txtLength += readLength;// 去掉记录的分隔符if (newline) {str.set(record.getBytes(), 0, record.getLength()- separator.length);}}} while (!newline && (bytesConsumed < maxBytesToConsume));if (bytesConsumed > (long) Integer.MAX_VALUE) {throw new IOException("Too many bytes before newline: "+ bytesConsumed);}return (int) bytesConsumed;}public int readLine(Text str, int maxLineLength) throws IOException {return readLine(str, maxLineLength, Integer.MAX_VALUE);}public int readLine(Text str) throws IOException {return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);}}
}

 

 

package MyInputFormat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TestMyInputFormat {public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {System.out.println("key:\t " + key);System.out.println("value:\t " + value);System.out.println("-------------------------");}}public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Path outPath = new Path("/hive/11");FileSystem.get(conf).delete(outPath, true);Job job = new Job(conf, "TestMyInputFormat");job.setInputFormatClass(TrackInputFormat.class);job.setJarByClass(TestMyInputFormat.class);job.setMapperClass(TestMyInputFormat.MapperClass.class);job.setNumReduceTasks(0);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(args[0]));org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

 

三、测试数据:

  cookieId    time     url                 cookieOverFlag

 

1       a        1_hao123
1       a        1_baidu
1       b        1_google       2END
2       c        2_google
2       c        2_hao123
2       c        2_google       1END
3       a        3_baidu
3       a        3_sougou
3       b        3_soso         2END

 

四、结果:

 

key:	 0
value:	 1	a	1_hao123	
1	a	 1_baidu	
1	b	 1_google	2
-------------------------
key:	 47
value:	 2	c	 2_google	
2	c	 2_hao123	
2	c	 2_google	1
-------------------------
key:	 96
value:	 3	a	 3_baidu	
3	a	 3_sougou	
3	b	 3_soso	2
-------------------------

这篇关于大数据【五十六】【转】自定义 hadoop MapReduce InputFormat 切分输入文件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/727285

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数