hadoop rpc机制 将avro引入hadoop rpc机制初探

2023-12-10 19:18

本文主要是介绍hadoop rpc机制 将avro引入hadoop rpc机制初探,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

转载:http://www.tbdata.org/archives/1413


1 RPC

RPC(Remote Procedure Call)——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

2 hadoop.ipc

2.1 Server

RPC Server实现了一种抽象的RPC服务,同时提供Call队列。

Ø  RPC Server结构

结构 功能
Server.ListenerRPC Server的监听者,用来接收RPC Client的连接请求和数据,其中数据封装成Call后PUSH到Call队列。
Server.HandlerRPC Server的Call处理者,和Server.Listener通过Call队列交互。
Server.ResponderRPC Server的响应者。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出的数据,交由Server.Responder来完成。
Server.ConnectionRPC Server数据接收者。提供接收数据,解析数据包的功能。
Server.Call持有客户端的Call信息。

Ø  RPC Server主要流程

RPC Server作为服务提供者由两个部分组成:接收Call调用和处理Call调用。

接收Call调用负责接收来自RPC Client的调用请求,编码成Call对象后放入到Call队列中。这一过程由Listener线程完成。具体步骤:

²  Listener线程监视RPC Client发送过来的数据。

²  当有数据可以接收时,调用Connection的readAndProcess方法。

²  Connection边接收边对数据进行处理,如果接收到一个完整的Call包,则构建一个Call对象PUSH到Call队列中,由Handler线程来处理Call队列中的所有Call。

处理Call调用负责处理Call队列中的每个调用请求,由Handler线程完成:

²  Handler线程监听Call队列,如果Call队列非空,按FIFO规则从Call队列取出Call。

²  将Call交给RPC.Server处理。

²  借助JDK提供的Method,完成对目标方法的调用,目标方法由具体的业务逻辑实现。

²  返回响应。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出的数据,则交由Server.Responder来完成。

交互过程如下图所示:

图 RPC交互过程图

这里还需要提到的是,在namenode的高负荷运行的环境下,单线程的Listener线程在读取rpc call中带来的参数时,如果该rpc call的调用所带的参数非常的大(如BlockReport),那么Listener在读取这种rpc call的调用参数时就会花费很多的时间,那么此时单线程的Listener就成了namenode服务的瓶颈。所以后来对这种机制进行了优化,将对rpc call的参数读取的方式也换成异步的方式,在Listener中增加了一个reader pool,将需要读取参数的rpc call的read请求放入pool中,然后利用多个reader并行的读取,这样就能将listener线程给解放出来。reader读取参数后,构造出Call变量,放入Call队列中。因此,后续的hadoop版本中都会增加一个配置参数:ipc.server.read.threadpool.size,以达到优化rpc接收rpc call参数的效率。

2.2 Client

RPC Client是Client的实现和入口类。

Ø  RPC Client结构

结构 功能
Client.ConnectionId到RPC Server对象连接的标识。
Client.CallCall调用信息。
Client.ParallelResultsCall响应。
RPC.Invoker对InvocationHandler的实现,提供invoke方法,实现RPC Client对RPC Server对象的调用。
RPC.Invocation用来序列化和反序列化RPC Client的调用信息。(主要应用JAVA的反射机制和InputStream/OutputStream)

Ø  RPC Client主要流程

每一个Call都是由RPC Client发起。步骤说明:

²  RPC Client发起RPC Call,通过JAVA反射机制转化为对Client.call调用。

²  调用getConnection得到与RPC Server的连接。每一个RPC Client都维护一个HashMap结构的到RPC Server的连接池。具体建立连接的流程见下图。

图 RPC Client建立连接流程

²  通过Connection将序列化后的参数发送到RPC服务端。

²  阻塞方式等待RPC服务端返回响应。

2.3 同步

客户端发起的RPC调用是同步的,而服务端处理RPC调用是异步的。客户端调用线程以阻塞同步的方式发起RPC连接及RPC调用,将参数等信息发送给Listener,然后等待Connection接收响应返回。

Listener负责接收RPC连接和RPC数据,当一个Call的数据接收完后,组装成Call,并将Call放入由Handler提供的Call队列中。

Handler线程监听Call队列,如果Call队列不为空,则按FIFO方式取出Call,并转为实际调用,以非阻塞方式将响应发回给Connection,未发送完毕的响应交给Responder处理。

3 Avro

关于Avro与Thrift的比较,http://www.tbdata.org/archives/1307中做了详细的分析,本节主要介绍avro的一些细节。

3.1 综述

Avro完全依赖模式(Schema),通过Schema定义各种数据结构,只有确定了Schema才能对数据进行解释,所以在数据的序列化和反序列化之前,必须先确定Schema的结构。正是Schema的引入,使得数据具有了自描述的功能,同时能够实现动态加载,另外与其他的数据序列化系统如Thrift相比,数据之间不存在其他的任何标识,有利于提高数据处理的效率。Avro的诸多优势,使得Avro将成为代替Hadoop现有RPC的下一代通讯中间件系统。

Schema通过JSON对象表示。Schema定义了简单数据类型和复杂数据类型,其中复杂数据类型包含不同属性。通过各种数据类型用户可以自定义丰富的数据结构。

Schema由下列JSON对象之一定义:

1.        JSON字符串:命名

2.        JSON对象:{“type”: “typeName” …attributes…}

3.        JSON数组:Avro中Union的定义

3.2 数据类型

数据类型标准化的意义:一方面使不同系统对相同的数据能够正确解析,另一方面,数据类型的标准定义有利于数据序列化/反序列化。

Ø  简单数据类型

Avro定义了几种简单数据类型,下表是其简单说明。

类型说明
Nullno value
Booleana binary value
Int32-bit signed integer
Long64-bit signed integer
Floatsingle precision (32-bit) IEEE 754 floating-point number
Doubledouble precision (64-bit) IEEE 754 floating-point number
Bytessequence of 8-bit unsigned bytes
Stringunicode character sequence

简单数据类型由类型名称定义,不包含属性信息,例如字符串定义如下:

{“type”: “string”}

Ø  复杂数据类型

Avro定义了六种复杂数据类型,每一种复杂数据类型都具有独特的属性,下表就每一种复杂数据类型进行说明。

类型属性说明
records type nameRecord
namea JSON string providing the name of the record (required).
namespacea JSON string that qualifies the name(optional).
doca JSON string providing documentation to the user of this schema (optional).
aliasesa JSON array of strings, providing alternate names for this record (optional).
fieldsa JSON array, listing fields (required).
namea JSON string.
typea schema/a string of defined record.
defaulta default value for field when lack.
orderordering of this field.
Enumstype nameEnum
namea JSON string providing the name of the enum (required).
namespacea JSON string that qualifies the name.
doca JSON string providing documentation to the user of this schema (optional).
aliasesa JSON array of strings, providing alternate names for this enum (optional)
symbolsa JSON array, listing symbols, as JSON strings (required). All symbols in an enum must be unique.
Arraystype nameArray
itemsthe schema of the array’s items.
Mapstype nameMap
valuesthe schema of the map’s values.
Fixedtype nameFixed
namea string naming this fixed (required).
namespacea string that qualifies the name.
aliasesa JSON array of strings, providing alternate names for this enum (optional).
sizean integer, specifying the number of bytes per value (required).
Unionsa JSON arrays
P.S.: may not contain more than one schema with the same type, except for the named types record, fixed and enum

每一种复杂数据类型都含有各自的一些属性,其中部分属性是必需的,部分是可选的。例如:下图示为链表的Schema结构。其他类型的Schema结构实例以此类推这里就不一一列举。

这里需要说明Record类型中field属性的默认值,当Record Schema实例数据中某个field属性没有提供实例数据时,则由默认值提供,具体值见下表。Union的field默认值由Union定义中的第一个Schema决定。

avro typejson typedefault
NullNullNull
BooleanBooleanTrue
int,longInteger1
float,doubleNumber1.1
BytesString“\u00FF”
StringString“foo”
RecordObject{“a”: 1}
EnumString“FOO”
ArrayArray[1]
MapObject{“a”: 1}
FixedString“\u00ff”

3.3 数据序列化

Avro指定两种数据序列化编码方式:binary encoding和JSON encoding。其中各种数据类型的binary encoding规则如下所述:

Ø  简单数据类型

TypeEncodingExample
Nullzero bytenull
Booleana sigle byte{true: 1,false:0}
int/longvariable-length zig-zag coding 
Float4 bytesJava’s floatToIntBits
Double8 bytesJava’s doubleToLongBits
Bytesa long followed by that many bytes of data 
Stringa long followed by that many bytes of UTF-8 encoded character data”foo”:{3,f,o,o}

06 66 6f 6f

Ø  复杂数据类型

TypeEncoding
Recordsencoded as just the concatenation of the encodings of its fields
Enumsint representing the zero-based position of the symbol in the schema
Arraysencoded as series of blocks. A block with count 0 indicates the end of the array. block:{long,items}
mapsencoded as series of blocks. A block with count 0 indicates the end of the map. block:{long,key/value pairs}.
Unionsencoded by first writing a long value indicating the zero-based position within the union of the schema of its value. The value is then encoded per the indicated schema within the union.
fixedencoded using number of bytes declared in the schema.

下面就各种复杂数据类型的binary encoding举例说明。

例1:records

设:a = 27, b = “foo” ( enc:36(27), 06(3), 66(“f”), 6f(“o”) )

BC:36 06 66 6f 6f

例2:enums

设:”D”( enc:06(3) )

BC:06

例3:arrays

设:{ 3, 27 } (enc:04(2), 06(3), 36(27) )

BC:04 06 36 00

例4:maps

设:{ (“a”:1), (“b”:2) }

BC:02 61 02 02 62 04

例5:unions

设:(1) null; (2) “a”

BC:

(1) 02;说明:02代表null在union定义中的位置1;

(2) 00 02 61;说明:00为string在union定义的位置,02 61为”a”的编码。

3.4 排序

Avro为数据定义了一个标准的排列顺序。比较在很多时候是经常被使用到的对象之间的操作,标准定义方便有效的比较和排序。同时标准的定义可以方便对Avro的二进制编码数据直接进行排序而不需要反序列化。

只有当数据项包含相同的Schema的时候,数据之间的比较才有意义。数据的比较按照Schema深度优先,从左至右的顺序递归的进行。找到第一个不匹配即可终止比较。

两个拥有相同的模式的项的比较按照以下规则进行:

Ø  null总是相等。

Ø  int,long,float按照数值大小比较。

Ø  boolean是false在true之前。

Ø  string按照字典序进行比较。

Ø  bytes,fixed按照byte的字典序进行比较。

Ø  array按照元素的字典序进行比较。

Ø  enum按照符号在枚举中的位置比较。

Ø  record按照域的字典序排序,如果指定了以下属性:

²  “ascending”,域值的顺序不变。

²  “descending”,域值的顺序颠倒。

²  “ignore”,排序的时候忽略域值。

Ø  map不可进行比较。

3.5 对象容器文件

Avro定义了一个简单的对象容器文件格式。一个文件对应一个模式,所有存储在文件中的对象都是根据模式写入的。对象按照块进行存储,块可以采用压缩的方式存储。为了在进行mapreduce处理的时候有效的切分文件,在块之间采用了同步记号。一个文件可以包含任意用户定义的元数据。

一个文件由两部分组成:文件头和一个或者多个文件数据块。

文件头

Ø  四个字节,ASCII ‘O’, ‘b’, ‘j’, 1。

Ø  文件元数据,用于描述Schema。

Ø  16字节的文件同步记号。

其中,文件元数据的格式为:

Ø  值为-1的长整型,表明这是一个元数据块。

Ø  标识块长度的长整型。

Ø  标识块中key/value对数目的长整型。

Ø  每一个key/value对的string key和bytes value。

Ø  标识块中字节总数的4字节长的整数。

文件数据块

数据是以块结构进行组织的,一个文件可以包含一个或者多个文件数据块。

Ø  表示文件中块中对象数目的长整型。

Ø  表示块中数据序列化后的字节数长度的长整型。

Ø  序列化的对象。

Ø  16字节的文件同步记号。

当数据块的长度为0时即为文件数据块的最后一个数据,此后的所有数据被自动忽略。

下图示对象容器文件的结构分解及说明:

图 对象容器文件分解(引用自http://langyu.javaeye.com/blog/708568)

4 Avro RPC服务

对象容器文件是Avro的数据存储的具体实现,数据交换则由RPC服务提供,与对象容器文件类似,数据交换也完全依赖Schema,所以与Hadoop目前的RPC不同,Avro在数据交换之前需要通过握手过程先交换Schema。

握手过程

握手的过程是确保Server和Client获得对方的Schema定义,从而使Server能够正确反序列化请求信息,Client能够正确反序列化响应信息。一般的,Server/Client会缓存最近使用到的一些协议格式,所以,大多数情况下,握手过程不需要交换整个Schema文本。

所有的RPC请求和响应处理都建立在已经完成握手的基础上。对于无状态的连接,所有的请求响应之前都附有一次握手过程;对于有状态的连接,一次握手完成,整个连接的生命期内都有效。

具体过程:

Ø  Client发起HandshakeRequest,其中含有Client本身Schema Hash值和对应Server端的Schema Hash值(clientHash!=null, clientProtocol=null, serverHash!=null)。如果本地缓存有serverHash值则直接填充,如果没有则通过猜测填充。

Ø  Server用如下之一HandshakeResponse响应Client请求:

²  (match=BOTH, serverProtocol=null, serverHash=null):当Client发送正确的serverHash值且Server缓存相应的clientHash。握手过程完成,之后的数据交换都遵守本次握手结果。

²  (match=CLIENT, serverProtocol!=null, serverHash!=null):当Server缓存有Client的Schema,但是Client请求中Server Hash值不正确。此时Server发送Server端的Schema数据和相应的Hash值,此次握手完成,之后的数据交换都遵守本次握手结果。

²  (match=NONE):当Client发送的ServerHash不正确且Server端没有Client Schema的缓存。这种情况下Client需要重新提交请求信息 (clientHash!=null, clientProtocol!=null, serverHash!=null),Server响应 (match=BOTH, serverProtocol=null, serverHash=null),此次握手过程完成,之后的数据交换都遵守本次握手结果。

握手过程使用的Schema结构如下图示。

图 握手过程使用的Schema

消息帧格式

消息从客户端发送到服务器端需要经过传输层,它发送请求并接收服务器端的响应。到达传输层的数据就是二进制数据。通常以HTTP作为传输模型,数据以POST方式发送到对方去。在 Avro中消息首先分帧后被封装成为一组缓冲区(Buffer)。

数据帧的格式如下:

Ø  一系列Buffer:

²  4字节的Buffer长度

²  Buffer字节数据

Ø  长度为0的Buffer结束数据帧

Call格式

一个调用由请求消息,结果响应消息或者错误消息组成。请求和响应包含可扩展的元数据,两种消息都按照之前提出的方法分帧。

调用的请求格式为:

Ø  请求元数据,一个类型值的映射。

Ø  消息名,一个Avro字符串。

Ø  消息参数。参数根据消息的请求定义序列化。

调用的响应格式为:

Ø  响应的元数据,一个类型值的映射。

Ø  一字节的错误标志位。

Ø  如果错误标志为false,响应消息,根据响应的模式序列化。

如果错误标志位true,错误消息,根据消息的错误联合模式序列化。

| hadoop,云计算,所有 | Comments (1)

One Response

  1. llkfs 说:

    avro是个好东西,不过这种通信中间件太多了,thrift,protobuffer,avro,功能相近,性能有差距不大,avro相对其他产品的关键就在于他是cutting写的



这篇关于hadoop rpc机制 将avro引入hadoop rpc机制初探的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/478081

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

【Tools】大模型中的自注意力机制

摇来摇去摇碎点点的金黄 伸手牵来一片梦的霞光 南方的小巷推开多情的门窗 年轻和我们歌唱 摇来摇去摇着温柔的阳光 轻轻托起一件梦的衣裳 古老的都市每天都改变模样                      🎵 方芳《摇太阳》 自注意力机制(Self-Attention)是一种在Transformer等大模型中经常使用的注意力机制。该机制通过对输入序列中的每个元素计算与其他元素之间的相似性,

如何通俗理解注意力机制?

1、注意力机制(Attention Mechanism)是机器学习和深度学习中一种模拟人类注意力的方法,用于提高模型在处理大量信息时的效率和效果。通俗地理解,它就像是在一堆信息中找到最重要的部分,把注意力集中在这些关键点上,从而更好地完成任务。以下是几个简单的比喻来帮助理解注意力机制: 2、寻找重点:想象一下,你在阅读一篇文章的时候,有些段落特别重要,你会特别注意这些段落,反复阅读,而对其他部分

【Tools】大模型中的注意力机制

摇来摇去摇碎点点的金黄 伸手牵来一片梦的霞光 南方的小巷推开多情的门窗 年轻和我们歌唱 摇来摇去摇着温柔的阳光 轻轻托起一件梦的衣裳 古老的都市每天都改变模样                      🎵 方芳《摇太阳》 在大模型中,注意力机制是一种重要的技术,它被广泛应用于自然语言处理领域,特别是在机器翻译和语言模型中。 注意力机制的基本思想是通过计算输入序列中各个位置的权重,以确