Hadoop重点难点:Hadoop IO/压缩/序列化

2023-10-09 00:32

本文主要是介绍Hadoop重点难点:Hadoop IO/压缩/序列化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Hadoop – IO

  1. 输入文件从HDFS进行读取.

  2. 输出文件会存入本地磁盘.

  3. Reducer和Mapper间的网络I/O,从Mapper节点得到Reducer的检索文件.

  4. 使用Reducer实例从本地磁盘回读数据.

  5. Reducer输出- 回传到HDFS.

序列化

序列化是指将结构化对象转化为字节流以便在网络上传输或写到磁盘进行永久存储的过程。反序列化是指将字节流转回结构化对象的逆过程。

序列化用于分布式数据处理的两大领域:进程间通信和永久存储

在Hadoop中,系统中多个节点进程间的通信是通过“远程过程调用”(RPC)实现的。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。通常情况下,RPC序列化格式如下:

1.紧凑

紧凑格式能充分利用网络带宽(数据中心最稀缺的资源)

2.快速

进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的。

3.可扩展

为了满足新的需求,协议不断变化。所以在控制客户端和服务期的过程中,需要直接引进相应的协议。例如,需要能够在方法调用的过程中增加新的参数,并且新的服务器需要能够接受来自老客户端的老格式的消息(无新增的参数)。

4.支持互操作

对于系统来说,希望能够支持以不同语言写的客户端与服务器交互,所以需要设计一种特定的格式来满足这一需求。

Writable 接口

Writable 接口定义了两个方法:一个将其状态写入 DataOutput 二进制流,另一个从 DataInput二进制流读取状态。

BytesWritable

BytesWritable 是对二进制数据数组的封装。它的序列化格式为一个指定所含数据字节数的整数域(4字节),后跟数据内容的本身。例如,长度为2的字节数组包含数值3和5,序列化形式为一个4字节的整数(00000002)和该数组中的两个字节(03和05)

NullWritable

NullWritable 是 writable 的特殊类型,它的序列化长度为0。它并不从数据流中读取数据,也不写入数据。它充当占位符。

ObjectWritable和GenericWritable

ObjectWritable 是对 java 基本类型(String,enum,Writable,null或这些类型组成的数组)的一个通用封装。它在 Hadoop RPC 中用于对方法的参数和返回类型进行封装和解封装。

Wriable集合类

io 软件包共有6个 Writable 集合类,分别是

  • ArrayWritable

  • ArrayPrimitiveWritable

  • TwoDArrayWritable

  • MapWritable

  • SortedMapWritable

  • EnumMapWritable

ArrayWritable 和 TwoDArrayWritable 是对 Writeble 的数组和两维数组(数组的数组)的实现。ArrayWritable 或 TwoDArrayWritable 中所有元素必须是同一类的实例。ArrayWritable 和 TwoDArrayWritable 都有get() ,set() 和 toArray()方法,toArray() 方法用于新建该数组的一个浅拷贝。

ArrayPrimitiveWritable 是对 Java 基本数组类型的一个封装。调用 set() 方法时,可以识别相应组件类型,因而无需通过继承该类来设置类型。

序列化框架

尽管大多数 Mapreduce 程序使用的都是 Writable 类型的键和值,但这并不是 MapReduce API 强制要求使用的。事实上,可以使用任何类型,只要能有一个机制对每个类型进行类型与二进制表示的来回转换就可以。

为了支持这个机制,Hadoop 有一个针对可替换序列化框架的 API 。序列化框架用一个 Serialization 实现来表示。Serialization 对象定义了从类型到 Serializer 实例(将对象转换为字节流)和 Deserializer 实例(将字节流转换为对象)的映射方式。

序列化IDL

还有许多其他序列化框架从不同的角度来解决问题:不通过代码来定义类型,而是使用接口定义语言以不依赖与具体语言的方式进行声明。由此,系统能够为其他语言生成模型,这种形式能有效提高互操作能力。它们一般还会定义版本控制方案。

两个比较流行的序列化框架 Apache Thrift 和Google的Protocol Buffers,常常用作二进制数据的永久存储格式。Mapreduce 格式对该类的支持有限,但在 Hadoop 内部,部分组件仍使用上述两个序列化框架来实现 RPC 和数据交换。

基于文件的数据结构

对于某些应用,我们需要一种特殊的数据结构来存储自己的数据。对于基于 Mapreduce 的数据处理,将每个二进制数据大对象单独放在各自的文件中不能实现可扩展性,所以 Hadoop 为此开发了很多更高层次的容器。

关于 SequenceFile 。

考虑日志文件,其中每一行文本代表一条日志记录。纯文本不适合记录二进制类型的数据。在这种情况下,Hadoop 的 SequenceFile 类非常合适,为二进制键值对提供了一个持久数据结构。将它作为日志文件的存储格式时,你可以自己选择键,以及值可以是 Writable 类型。

SequenceFile 也可以作为小文件的容器。HDFS和Mapreduce 是针对大文件优化的,所以通过 SequenceFile 类型将小文件包装起来,可以获得更高效率的存储和处理。

SequnceFile的写操作

通过 createWriter()静态方法可以创建 SequenceFile 对象,并返回 SequnceFile.Writer 实例。该静态方法有多个重载版本,但都需要制定待写入的数据流,Configuration 对象,以及键和值的类型。存储在 SequenceFIle 中的键和值并不一定是 Writable 类型。只要能够被 Sertialization 序列化和反序列化,任何类型都可以。

SequenceFile的读操作

从头到尾读取顺序文件不外乎创建 SequenceFile.reader 实例后反复调用 next() 方法迭代读取记录。读取的是哪条记录与使用的序列化框架有关。如果使用的是 Writable 类型,那么通过键和值作为参数的 next() 方法可以将数据流的下一条键值对读入变量中。

  1. 通过命令行接口显示 SequenceFile。

hadoop fs 命令有一个 -text 选项可以以文本形式显示顺序文件。该选项可以查看文件的代码,由此检测出文件的类型并将其转换为相应的文本。该选项可以识别 gzip 压缩文件,顺序文件和 Avro 数据文件;否则,假设输入为纯文本文件。

  1. SequenceFile 的排序和合并。

Mapreduce 是对多个顺序文件进行排序(或合并)最有效的方法。Mapreduce 本身是并行的,并且可由你制定使用多少个 reducer 。例如,通过制定一个 reducer ,可以得到一个输出文件。

3.SequenceFile 的格式。

顺序文件由文件头和随后的一条或多条记录组成。顺序文件的前三个字节为 SEQ,紧随其后的一个字节表示顺序文件的版本号。文件头还包括其他字段,例如键和值的名称,数据压缩细节,用户定义的元数据以及同步标识。同步标识用于在读取文件时能够从任意位置开始识别记录边界。每个文件都有一个随机生成的同步标识,其值存储在文件头中,位于顺序文件中的记录与记录之间。同步标识的额外存储开销要求小于1%,所以没有必要在每条记录末尾添加该标识。

关于MapFile

MapFile 是已经排过序的 SequenceFile ,它有索引,所以可以按键查找。索引本身就是一个 SequenceFile ,包含 map 中的一小部分键。由于索引能够加载进内存,因此可以提供对主数据文件的快速查找。主数据文件则是另一个 SequenceFIle ,包含了所有的 map 条目,这些条目都按照键顺序进行了排序。

其他文件格式和面向列的格式

顺序文件和 map 文件是 Hadoop 中最早的,但并不是仅有的二进制文件格式,事实上,对于新项目而言,有更好的二进制格式可供选择。

Avro 数据文件在某些方面类似顺序文件,是面向大规模数据处理而设计的。但是 Avro 数据文件又是可移植的,它们可以跨越不同的编程语言使用。顺序文件,map 文件和 Avro 数据文件都是面向行的格式,意味着每一行的值在文件中是连续存储的。在面向列的格式中,文件中的行被分割成行的分片,然后每个分片以面向列的形式存储:首先存储每行第一列的值,然后是每行第2列的值,如此以往。

压缩

能够减少磁盘的占用空间和网络传输的量,并加速数据在网络和磁盘上的传输。

Hadoop 应用处理的数据集非常大,因此需要借助于压缩。使用哪种压缩格式与待处理的文件的大小,格式和所用的工具有关。比较各种压缩算法的压缩比和性能(从高到低):

  1. 使用容器文件格式,例如顺序文件, Avro 数据文件。ORCF 了说 Parquet 文件

  2. 使用支持切分的压缩格式,例如 bzip2 或者通过索引实现切分的压缩格式,例子如LZO。

  3. 在应用中将文件中切分成块,并使用任意一种他所格式为每个数据块建立压缩文件(不论它是否支持切分)。在这种情况下,需要合理选择数据大小,以确保压缩后的数据块的大小近似于HDFS块的大小。

  4. 存储未经压缩的文件。

重点:压缩和拆分一般是冲突的(压缩后的文件的 block 是不能很好地拆分独立运行,很多时候某个文件的拆分点是被拆分到两个压缩文件中,这时 Map 任务就无法处理,所以对于这些压缩,Hadoop 往往是直接使用一个 Map 任务处理整个文件的分析)。对大文件不可使用不支持切分整个文件的压缩格式,会失去数据的特性,从而造成 Mapreduce 应用效率低下。

Map 的输出结果也可以进行压缩,这样可以减少 Map 结果到 Reduce 的传输的数据量,加快传输速率。

在 Mapreduce 中使用压缩
FileOutputFormat.setCompressOutput(job,true);
FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class);

如果输出生成顺序文件,可以设置 mapreduce.output.fileoutputformat.compress.type 属性来控制限制使用压缩格式。默认值是RECORD,即针对每条记录进行压缩。如果将其改为BLOCK,将针对一组记录进行压缩,这是推荐的压缩策略,因为它的压缩效率更高。

完整性
  • 检测数据是否损坏的常见措施是,在数据第一次引入系统时计算校验和并在数据通过一个不可靠的通道进行传输时再次计算校验和,这样就能发现数据是否损坏,如果计算所得的新校验和和原来的校验和不匹配,我们就认为数据已损坏。但该技术并不能修复数据。常见的错误检测码是 CRC-32(32位循环冗余检验),任何大小的数据输入均计算得到一个32位的整数校验和。

  • datanode 负责在收到数据后存储该数据及其校验和之前对数据进行验证。它在收到客户端的数据或复制其他 datanode 的数据时执行这个操作。正在写数据的客户端将数据及其校验和发送到由一系列 datanode 组成的管线,管线中最后一个 datanode 负责验证校验和。如果 datanode 检测到错误,客户端就会收到一个 IOException 异常的子类。

  • 客户端从 datanode 读取数据时,也会验证校验和,将它们与 datanode 中存储的校验和进行比较。每个datanode均持久保存有一个验证的校验和日志,所以它知道每个数据块的最后一次验证时间。客户端成功验证一个数据块后,会告诉这个 datanode , datanode 由此更新日志。保存这些统计信息对于检测损坏的磁盘很有价值。

  • 不只是客户端在读取数据块时会验证校验和,每个 datanode 也会在一个后台线程中运行一个 DataBlockScanner ,从而定期验证存储在这个 datanode 上的所有数据块。该项措施是解决物理存储媒体上位损坏的有力措施。

  • 由于 HDFS 存储着每个数据块的复本,因此它可以通过数据复本来修复损坏的数据块,进而得到一个新的,完好无损的复本。基本思路是,客户端在读取数据块时,如果检测到错误,首先向 namenode 报告已损坏的数据块及其正在尝试读取操作的这个 datanode ,再抛出 ChecksumException 异常。namenode 将这个数据块复本标记为已损坏,这样它不再将客户端处理请求直接发送到这个节点,或尝试将这个复本复制到另一个 datanode 。之后,它安排这个数据块的一个复本复制到另一个 datanode ,这样一来,数据块的复本因子又回到期望水平。此后,已损坏的数据块复本便被删除。

  • Hadoop的LocalFileSystem 执行客户端的校验和验证。这意味着在你写入一个名为 filename 的文件时,文件系统客户端会明确在包含每个文件快校验和的同一个目录内新建一个 filename.crc 隐藏文件。文件块的大小作为元数据存储在.crc文件中,所以即使文件块大小的设置已经发生变化,仍然可以正确读回文件。在读取文件时需要验证校验和,并且如果检测到错误,LocalFileSystem 还会抛出一个 ChecksumException 异常。

这篇关于Hadoop重点难点:Hadoop IO/压缩/序列化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/169076

相关文章

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

hdu1565(状态压缩)

本人第一道ac的状态压缩dp,这题的数据非常水,很容易过 题意:在n*n的矩阵中选数字使得不存在任意两个数字相邻,求最大值 解题思路: 一、因为在1<<20中有很多状态是无效的,所以第一步是选择有效状态,存到cnt[]数组中 二、dp[i][j]表示到第i行的状态cnt[j]所能得到的最大值,状态转移方程dp[i][j] = max(dp[i][j],dp[i-1][k]) ,其中k满足c

Java IO 操作——个人理解

之前一直Java的IO操作一知半解。今天看到一个便文章觉得很有道理( 原文章),记录一下。 首先,理解Java的IO操作到底操作的什么内容,过程又是怎么样子。          数据来源的操作: 来源有文件,网络数据。使用File类和Sockets等。这里操作的是数据本身,1,0结构。    File file = new File("path");   字

springboot体会BIO(阻塞式IO)

使用springboot体会阻塞式IO 大致的思路为: 创建一个socket服务端,监听socket通道,并打印出socket通道中的内容。 创建两个socket客户端,向socket服务端写入消息。 1.创建服务端 public class RedisServer {public static void main(String[] args) throws IOException {

Java基础回顾系列-第七天-高级编程之IO

Java基础回顾系列-第七天-高级编程之IO 文件操作字节流与字符流OutputStream字节输出流FileOutputStream InputStream字节输入流FileInputStream Writer字符输出流FileWriter Reader字符输入流字节流与字符流的区别转换流InputStreamReaderOutputStreamWriter 文件复制 字符编码内存操作流(

android java.io.IOException: open failed: ENOENT (No such file or directory)-api23+权限受权

问题描述 在安卓上,清单明明已经受权了读写文件权限,但偏偏就是创建不了目录和文件 调用mkdirs()总是返回false. <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/><uses-permission android:name="android.permission.READ_E