Hadoop2源码分析-YARN RPC 示例介绍

2024-05-27 12:32

本文主要是介绍Hadoop2源码分析-YARN RPC 示例介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 我们知道在Hadoop的RPC当中,其主要由RPC,Client及Server这三个大类组成,分别实现对外提供编程接口、客户端实现及服务端实现。如下图所示:

 

 

  图中是Hadoop的RPC的一个类的关系图,大家可以到《Hadoop2源码分析-RPC探索实战》一文中,通过代码示例去理解他们之间的关系,这里就不多做赘述了。接下来,我们去看Yarn的RPC。

  Yarn对外提供的是YarnRPC这个类,这是一个抽象类,通过阅读YarnRPC的源码可以知道,实际的实现由参数yarn.ipc.rpc.class设定,默认情况下,其值为:org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC,部分代码如下:

  • YarnRPC:
复制代码
public abstract class YarnRPC {// ......public static YarnRPC create(Configuration conf) {LOG.debug("Creating YarnRPC for " + conf.get(YarnConfiguration.IPC_RPC_IMPL));String clazzName = conf.get(YarnConfiguration.IPC_RPC_IMPL);if (clazzName == null) {clazzName = YarnConfiguration.DEFAULT_IPC_RPC_IMPL;}try {return (YarnRPC) Class.forName(clazzName).newInstance();} catch (Exception e) {throw new YarnRuntimeException(e);}}}

复制代码
  • YarnConfiguration类:
复制代码
public class YarnConfiguration extends Configuration {//Configurationspublic static final String YARN_PREFIX = "yarn.";// IPC Configs
  public static final String IPC_PREFIX = YARN_PREFIX + "ipc.";/** RPC class implementation*/public static final String IPC_RPC_IMPL =IPC_PREFIX + "rpc.class";public static final String DEFAULT_IPC_RPC_IMPL = "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC";
}
复制代码

  而HadoopYarnProtoRPC 通过 RPC 的 RpcFactoryProvider 生成客户端工厂(由参数 yarn.ipc.client.factory.class 指定,默认值是 org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl)和服务器工厂 (由参数 yarn.ipc.server.factory.class 指定,默认值是 org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl),以根据通信协议的 Protocol Buffers 定义生成客户端对象和服务器对象。相关类的部分代码如下:

  • HadoopYarnProtoRPC
复制代码
public class HadoopYarnProtoRPC extends YarnRPC {private static final Log LOG = LogFactory.getLog(HadoopYarnProtoRPC.class);@Overridepublic Object getProxy(Class protocol, InetSocketAddress addr,Configuration conf) {LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,addr, conf);}@Overridepublic void stopProxy(Object proxy, Configuration conf) {RpcFactoryProvider.getClientFactory(conf).stopClient(proxy);}@Overridepublic Server getServer(Class protocol, Object instance,InetSocketAddress addr, Configuration conf,SecretManager<? extends TokenIdentifier> secretManager,int numHandlers, String portRangeConfig) {LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + " with " + numHandlers + " handlers");return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, instance, addr, conf, secretManager, numHandlers, portRangeConfig);}}
复制代码
  • RpcFactoryProvider

复制代码
public class RpcFactoryProvider {// ......public static RpcClientFactory getClientFactory(Configuration conf) {String clientFactoryClassName = conf.get(YarnConfiguration.IPC_CLIENT_FACTORY_CLASS,YarnConfiguration.DEFAULT_IPC_CLIENT_FACTORY_CLASS);return (RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);}//......
  
}
复制代码
/** Factory to create client IPC classes.*/public static final String IPC_CLIENT_FACTORY_CLASS =IPC_PREFIX + "client.factory.class";public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS = "org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl";

  在 YARN 中并未使用Hadoop自带的Writable来做序列化,而是使用 Protocol Buffers 作为默认的序列化机制,这带来的好处主要有以下几点:

  • 继承Protocol Buffers的优点:Protocol Buffers已被实践证明其拥有高效性、可扩展性、紧凑性以及跨语言性等特点。
  • 支持在线升级回滚:在Hadoop 2.x版本后,添加的HA方案,该方案能够进行主备切换,在不停止NNA节点服务的前提下,能够在线升级版本。

3.YARN的RPC示例

  YARN 的工作流程是先定义通信协议接口ResourceTracker,它包含2个函数,具体代码如下所示:

  • ResourceTracker:
复制代码
public interface ResourceTracker {@Idempotentpublic RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnException,IOException;@AtMostOncepublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)throws YarnException, IOException;}
复制代码

  这里ResourceTracker提供了Protocol Buffers定义和Java实现,其中设计的Protocol Buffers文件有:ResourceTracker.proto、yarn_server_common_service_protos.proto和yarn_server_common_protos.proto,文件路径在Hadoop的源码包的 hadoop-2.6.0-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto,这里就不贴出3个文件的具体代码类,大家可以到该目录去阅读这部分代码。这里需要注意的是,若是大家要编译这些文件需要安装 ProtoBuf 的编译环境,环境安装较为简单,这里给大家简要说明下。

  首先是下载ProtoBuf的安装包,然后解压,进入到解压目录,编译安装。命令如下:

./configure --prefix=/home/work /protobuf/  make && make install

最后编译 .proto 文件的命令:

protoc ./ResourceTracker.proto  --java_out=./

  下面,我们去收取Hadoop源码到本地工程,运行调试相关代码。

  • TestYarnServerApiClasses:

复制代码
public class TestYarnServerApiClasses {// ......// 列举测试4个方法  

@Testpublic void testRegisterNodeManagerResponsePBImpl() {RegisterNodeManagerResponsePBImpl original =new RegisterNodeManagerResponsePBImpl();original.setContainerTokenMasterKey(getMasterKey());original.setNMTokenMasterKey(getMasterKey());original.setNodeAction(NodeAction.NORMAL);original.setDiagnosticsMessage("testDiagnosticMessage");RegisterNodeManagerResponsePBImpl copy =new RegisterNodeManagerResponsePBImpl(original.getProto());assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());assertEquals(1, copy.getNMTokenMasterKey().getKeyId());assertEquals(NodeAction.NORMAL, copy.getNodeAction());assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());}@Testpublic void testNodeHeartbeatRequestPBImpl() {NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl();original.setLastKnownContainerTokenMasterKey(getMasterKey());original.setLastKnownNMTokenMasterKey(getMasterKey());original.setNodeStatus(getNodeStatus());NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(original.getProto());assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());}@Testpublic void testNodeHeartbeatResponsePBImpl() {NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();original.setDiagnosticsMessage("testDiagnosticMessage");original.setContainerTokenMasterKey(getMasterKey());original.setNMTokenMasterKey(getMasterKey());original.setNextHeartBeatInterval(1000);original.setNodeAction(NodeAction.NORMAL);original.setResponseId(100);NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(original.getProto());assertEquals(100, copy.getResponseId());assertEquals(NodeAction.NORMAL, copy.getNodeAction());assertEquals(1000, copy.getNextHeartBeatInterval());assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());assertEquals(1, copy.getNMTokenMasterKey().getKeyId());assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());}@Testpublic void testRegisterNodeManagerRequestPBImpl() {RegisterNodeManagerRequestPBImpl original = new RegisterNodeManagerRequestPBImpl();original.setHttpPort(8080);original.setNodeId(getNodeId());Resource resource = recordFactory.newRecordInstance(Resource.class);resource.setMemory(10000);resource.setVirtualCores(2);original.setResource(resource);RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl(original.getProto());assertEquals(8080, copy.getHttpPort());assertEquals(9090, copy.getNodeId().getPort());assertEquals(10000, copy.getResource().getMemory());assertEquals(2, copy.getResource().getVirtualCores());}}
复制代码
  • TestResourceTrackerPBClientImpl:

复制代码
public class TestResourceTrackerPBClientImpl {private static ResourceTracker client;private static Server server;private final static org.apache.hadoop.yarn.factories.RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);@BeforeClasspublic static void start() {System.out.println("Start client test");InetSocketAddress address = new InetSocketAddress(0);Configuration configuration = new Configuration();ResourceTracker instance = new ResourceTrackerTestImpl();server = RpcServerFactoryPBImpl.get().getServer(ResourceTracker.class, instance, address, configuration, null,1);server.start();client = (ResourceTracker) RpcClientFactoryPBImpl.get().getClient(ResourceTracker.class, 1,NetUtils.getConnectAddress(server), configuration);}@AfterClasspublic static void stop() {System.out.println("Stop client");if (server != null) {server.stop();}}/*** Test the method registerNodeManager. Method should return a not null* result.* */@Testpublic void testResourceTrackerPBClientImpl() throws Exception {RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);assertNotNull(client.registerNodeManager(request));ResourceTrackerTestImpl.exception = true;try {client.registerNodeManager(request);fail("there should be YarnException");} catch (YarnException e) {assertTrue(e.getMessage().startsWith("testMessage"));} finally {ResourceTrackerTestImpl.exception = false;}}/*** Test the method nodeHeartbeat. Method should return a not null result.* */@Testpublic void testNodeHeartbeat() throws Exception {NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);assertNotNull(client.nodeHeartbeat(request));ResourceTrackerTestImpl.exception = true;try {client.nodeHeartbeat(request);fail("there  should be YarnException");} catch (YarnException e) {assertTrue(e.getMessage().startsWith("testMessage"));} finally {ResourceTrackerTestImpl.exception = false;}}public static class ResourceTrackerTestImpl implements ResourceTracker {public static boolean exception = false;public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request)throws YarnException, IOException {if (exception) {throw new YarnException("testMessage");}return recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);}public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException {if (exception) {throw new YarnException("testMessage");}return recordFactory.newRecordInstance(NodeHeartbeatResponse.class);}}
}
复制代码

4.截图预览

  接下来,我们使用JUnit去测试代码,截图预览如下所示:

  • 对testRegisterNodeManagerRequestPBImpl()方法的一个DEBUG调试

  • testResourceTrackerPBClientImpl()方法的DEBUG调试

  这里由于设置exception的状态为true,在调用registerNodeManager()时,会打印一条测试异常信息。

if (exception) {
  throw new YarnException("testMessage");
}

5.总结

  在学习Hadoop YARN的RPC时,可以先了解Hadoop的RPC机制,这样在接触YARN的RPC的会比较好理解,YARN的RPC只是其中的一部分,后续会给大家分享更多关于YARN的内容。


转自:http://www.cnblogs.com/smartloli/p/4664842.html

这篇关于Hadoop2源码分析-YARN RPC 示例介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007484

相关文章

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57