Hadoop MapReduce中如何处理跨行Block和inputSplit

2024-08-25 06:48

本文主要是介绍Hadoop MapReduce中如何处理跨行Block和inputSplit,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Hadoop MapReduce中如何处理跨行Block和inputSplit
http://www.aboutyun.com/forum.php?mod=viewthread&tid=7704
(出处: about云开发)


Hadoop的初学者经常会疑惑这样两个问题:
1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?
2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果?
对于上面的两个问题,首先要明确两个概念:Block和InputSplit
1. block是hdfs存储文件的单位(默认是64M);
2. InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此,以行记录形式的文本,还真可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

public List<InputSplit> getSplits(JobContext job) throws IOException {  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  long maxSize = getMaxSplitSize(job);  // generate splits  List<InputSplit> splits = new ArrayList<InputSplit>();  List<FileStatus> files = listStatus(job);        for (FileStatus file: files) {  Path path = file.getPath();  long length = file.getLen();  if (length != 0) {  FileSystem fs = path.getFileSystem(job.getConfiguration());  BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);  if (isSplitable(job, path)) {  long blockSize = file.getBlockSize();  long splitSize = computeSplitSize(blockSize, minSize, maxSize);  long bytesRemaining = length;  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  splits.add(makeSplit(path, length-bytesRemaining, splitSize,  blkLocations[blkIndex].getHosts()));  bytesRemaining -= splitSize;  }  if (bytesRemaining != 0) {  splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  blkLocations[blkLocations.length-1].getHosts()));  }  } else { // not splitable  splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));  }  } else {   //Create empty hosts array for zero length files  splits.add(makeSplit(path, 0, length, new String[0]));  }  }  // Save the number of input files for metrics/loadgen  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  LOG.debug("Total # of splits: " + splits.size());  return splits;  
}  

复制代码

从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize
对文件进行切分,splitSize = computeSplitSize(blockSize, minSize, maxSize);blockSize,minSize,maxSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。
FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,其可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成,在Hadoop里,记录行形式的文本,通常采用默认的TextInputFormat,TextInputFormat关联的是LineRecordReader,下面我们来看看LineRecordReader的的nextKeyValue方法里读取文件的代码:

while (getFilePosition() <= end) {  newSize = in.readLine(value, maxLineLength,  Math.max(maxBytesToConsume(pos), maxLineLength));  if (newSize == 0) {  break;  }  

复制代码
其读取文件是通过LineReader(in就是一个LineReader实例)的readLine方法完成的:

public int readLine(Text str, int maxLineLength,  int maxBytesToConsume) throws IOException {  if (this.recordDelimiterBytes != null) {  return readCustomLine(str, maxLineLength, maxBytesToConsume);  } else {  return readDefaultLine(str, maxLineLength, maxBytesToConsume);  }  
}  /** * Read a line terminated by one of CR, LF, or CRLF. */  
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)  
throws IOException {  str.clear();  int txtLength = 0; //tracks str.getLength(), as an optimization  int newlineLength = 0; //length of terminating newline  boolean prevCharCR = false; //true of prev char was CR  long bytesConsumed = 0;  do {  int startPosn = bufferPosn; //starting from where we left off the last time  if (bufferPosn >= bufferLength) {  startPosn = bufferPosn = 0;  if (prevCharCR)  ++bytesConsumed; //account for CR from previous read  bufferLength = in.read(buffer);  if (bufferLength <= 0)  break; // EOF  }  for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline  if (buffer[bufferPosn] == LF) {  newlineLength = (prevCharCR) ? 2 : 1;  ++bufferPosn; // at next invocation proceed from following byte  break;  }  if (prevCharCR) { //CR + notLF, we are at notLF  newlineLength = 1;  break;  }  prevCharCR = (buffer[bufferPosn] == CR);  }  int readLength = bufferPosn - startPosn;  if (prevCharCR && newlineLength == 0)  --readLength; //CR at the end of the buffer  bytesConsumed += readLength;  int appendLength = readLength - newlineLength;  if (appendLength > maxLineLength - txtLength) {  appendLength = maxLineLength - txtLength;  }  if (appendLength > 0) {  str.append(buffer, startPosn, appendLength);  txtLength += appendLength;  }  } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);   <span style="color: #ff0000;">//①</span>  if (bytesConsumed > (long)Integer.MAX_VALUE)  throw new IOException("Too many bytes before newline: " + bytesConsumed);      return (int)bytesConsumed;  
}  

复制代码

我们分析下readDefaultLine方法,do-while循环体主要是读取文件,然后遍历读取的内容,找到默认的换行符就终止循环。前面说,对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取。这就体现在上述代码的While循环的终止条件上:
while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
newlineLength==0则以为一次do-while循环中读取的内容中没有遇到换行符,因maxBytesToConsume的默认值为Integer.MAX_VALUE,所以如果读取的内容没有遇到换行符,则会一直读取下去,知道读取的内容超过maxBytesToConsume。这样的出来方式,解决了一行记录跨InputSplit的读取问题,同样也会造成下面两个疑问:
1.既然在LineReader读取方法里面没有对考虑InputSplit的end进行处理,难道读取一个InputSplit的时候,会这样无限的读取下去么?
2.如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如果做到不读取L这条记录在B中的部分呢?
为了解决这两个问题,Hadoop通过下面的代码来做到:LineRecordReader的nextKeyValue方法。

public boolean nextKeyValue() throws IOException {  if (key == null) {  key = new LongWritable();  }  key.set(pos);  if (value == null) {  value = new Text();  }  int newSize = 0;  // We always read one extra line, which lies outside the upper  // split limit i.e. (end - 1)  while (getFilePosition() <= end) {        <span style="color: #ff0000;"> //②</span>  newSize = in.readLine(value, maxLineLength,  Math.max(maxBytesToConsume(pos), maxLineLength));  if (newSize == 0) {  break;  }  pos += newSize;  inputByteCounter.increment(newSize);  if (newSize < maxLineLength) {  break;  }  // line too long. try again  LOG.info("Skipped line of size " + newSize + " at pos " +   (pos - newSize));  }  if (newSize == 0) {  key = null;  value = null;  return false;  } else {  return true;  }  
}  

复制代码

过代码②处得While条件,就保证了InputSplit读取边界的问题,如果存在跨InputSplit的记录,也只好跨InputSplit读取一次。
再来看LineRecordReader的initialize方法:

// If this is not the first split, we always throw away first record  
// because we always (except the last split) read one extra line in  
// next() method.  
if (start != 0) {  start += in.readLine(new Text(), 0, maxBytesToConsume(start));  
}  
this.pos = start;  

复制代码
如果不是第一InputSplit,则在读取的时候,LineRecordReader会自动忽略掉第一个换行符之前的所有内容,这样就不存在重读读取的问题。

个人理解

这篇关于Hadoop MapReduce中如何处理跨行Block和inputSplit的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1104847

相关文章

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

Thymeleaf:生成静态文件及异常处理java.lang.NoClassDefFoundError: ognl/PropertyAccessor

我们需要引入包: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>sp

jenkins 插件执行shell命令时,提示“Command not found”处理方法

首先提示找不到“Command not found,可能我们第一反应是查看目标机器是否已支持该命令,不过如果相信能找到这里来的朋友估计遇到的跟我一样,其实目标机器是没有问题的通过一些远程工具执行shell命令是可以执行。奇怪的就是通过jenkinsSSH插件无法执行,经一番折腾各种搜索发现是jenkins没有加载/etc/profile导致。 【解决办法】: 需要在jenkins调用shell脚

明明的随机数处理问题分析与解决方案

明明的随机数处理问题分析与解决方案 引言问题描述解决方案数据结构设计具体步骤伪代码C语言实现详细解释读取输入去重操作排序操作输出结果复杂度分析 引言 明明生成了N个1到500之间的随机整数,我们需要对这些整数进行处理,删去重复的数字,然后进行排序并输出结果。本文将详细讲解如何通过算法、数据结构以及C语言来解决这个问题。我们将会使用数组和哈希表来实现去重操作,再利用排序算法对结果

8. 自然语言处理中的深度学习:从词向量到BERT

引言 深度学习在自然语言处理(NLP)领域的应用极大地推动了语言理解和生成技术的发展。通过从词向量到预训练模型(如BERT)的演进,NLP技术在机器翻译、情感分析、问答系统等任务中取得了显著成果。本篇博文将探讨深度学习在NLP中的核心技术,包括词向量、序列模型(如RNN、LSTM),以及BERT等预训练模型的崛起及其实际应用。 1. 词向量的生成与应用 词向量(Word Embedding)