【Flink集群RPC通讯机制(四)】集群组件(tm、jm与rm)之间的RPC通信

2024-02-22 15:12

本文主要是介绍【Flink集群RPC通讯机制(四)】集群组件(tm、jm与rm)之间的RPC通信,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1. 集群内部通讯方法概述
  • 2. TaskManager向ResourceManager注册RPC服务
  • 3. JobMaster向ResourceManager申请Slot计算资源

现在我们已经知道Flink中RPC通信框架的底层设计与实现,接下来通过具体的实例了解集群运行时中组件如何基于RPC通信框架构建相互之间的调用关系。

1. 集群内部通讯方法概述

通过RegisteredRpcConnection进行连接注册与通讯(维护心跳等)

当TaskExecutor启动后,会立即向ResourceManager中注册当前TaskManager的信息。同样,JobMaster组件启动后也立即会向ResourceManager注册JobMaster的信息。这些注册操作实际上就是在构建集群中各个组件之间的RPC连接,这里的注册连接在Flink中被称为RegisteredRpcConnection,集群组件之间的RPC通信都会通过创建RegisteredRpcConnection进行,例如获取RpcEndpoint对应的RpcGateway接口以及维护组件之间的心跳连接等。

如下图,集群运行时中各组件的注册连接主要如下三种RegisteredRpcConnection的实现。

  • JobManagerRegisteredRpcConnection:用于管理TaskManager中与JobManager之间的RPC连接。
  • ResourceManagerConnection:用于管理JobManager中与ResourceManager之间的RPC连接。
  • TaskExecutorToResourceManagerConnection:用于管理TaskExecutor中与ResourceManager之间的RPC连接。

如下图再有:

  1. RegisteredRpcConnection提供了generateRegistration()抽象方法,主要用于生成组件之间的RPC连接,每次调用RegisteredRpcConnection.start()方法启动RegisteredRpcConnection时,都会创建新的RetryingRegistration。

不同RegisteredRpcConnection创建的RetryingRegistration也会有所不同,例如在TaskExecutorToResourceManagerConnection中就会创建ResourceManagerRegistration实例。

  1. 调用rpcService.connect(targetAddress, targetType) ,返回RpcGateway的代理对象,通过RpcGateway连接到目标RpcEndpoint上。
  2. 在RetryingRegistration中会提供invokeRegistration()抽象方法,用于实现子类的RPC注册操作。

例如在ResourceManagerRegistration中会实现invokeRegistration()方法,在方法中调用resourceManager.registerTaskExecutor()将TaskExecutor信息注册到ResourceManager中,这里的ResourceManager就是ResourceManagerGateway接口代理类。

  1. 调用onRegistrationSuccess()方法继续后续操作,例如在JobManagerRegisteredRpcConnection中会向jobLeaderListener添加当前的jobId等信息。
  2. 如果当前组件没有成功到注册至目标组件时,会调用onRegistrationFailure()抽象方法继续后续操作,包括连接重连或停止整个RpcEndpoint对应的服务。

在这里插入图片描述

接着以TaskManager向ResourceManager注册RPC服务为例,介绍整个RPC连接的注册过程。
 

2. TaskManager向ResourceManager注册RPC服务

TaskManager向ResourceManager注册RPC服务的过程如图所示。
在这里插入图片描述

  1. TaskExecutor节点正常启动后,调用RpcEndpoint.onStart()方法初始化并启动TaskExecutor组件的内部服务。
  2. 创建监听服务
  1. TaskExecutor调用resourceManagerLeaderRetriever.start()方法,启动ResourceManager组件领导节点的监听服务并传入ResourceManagerLeaderListener,用于监听ResourceManager的领导节点的变化情况。
  2. 当ResourceManagerLeaderListener接收到来自ResourceManager的leaderAddress以及leaderSessionID的信息后,调用notifyOfNewResourceManagerLeader()方法通知TaskExecutor和新的ResourceManagerLeader建立RPC连接。
  1. 创建与ResourceManager组件的RPC网络连接

a. 调用TaskExecutor.reconnectToResourceManager()内部方法,创建与ResourceManager组件之间的RPC网络连接。
b. 在reconnectToResourceManager()方法中会事先调用closeResourceManagerConnection()方法关闭之前的ResourceManager连接,然后依次调用tryConnectToResourceManager()和connectToResourceManager()方法创建与ResourceManager节点的RPC连接。

  1. 创建TaskExecutorRegistration对象

在connectToResourceManager()方法中会创建TaskExecutorRegistration对象,用于存储TaskManager的注册信息,其中包括taskExecutorAddress、resourceId以及dataPort等连接信息,同时还包含hardwareDescription、defaultSlotResourceProfile以及totalResourceProfile等资源描述信息。

  1. 正式建立网络连接

创建TaskExecutorToResourceManagerConnection实例,正式与ResourceManager建立RPC网络连接,同时调用TaskExecutorToResourceManagerConnection.start()方法启动RPC连接。实际上调用的是RegisteredRpcConnection.start()方法。

  1. 创建新的创建新的Registration与其他组件的RPC连接

在RegisteredRpcConnection中会调用内部方法createNewRegistration()创建新的Registration。而在createNewRegistration()方法中会调用generateRegistration()子类方法,创建与其他组件之间的RPC连接。这里主要调用的是TaskExecutorToResourceManagerConnection.generateRegistration()方法。

  1. 调用RetryingRegistration.startRegistration()方法注册具体的RPC连接,实际上会调用AkkaRpcService.connect()方法创建并获取ResourceManager对应的RpcGateway接口。
  2. 调用ResourceManagerGateway.registerTaskExecutor()方法,最终完成在ResourceManager中注册TaskManager的操作。创建的TaskExecutorRegistration同时会传递给ResourceManager。
  3. 当ResourceManager接收到TaskManager的注册信息后,会在本地维护TaskManager的注册信息,并建立与TaskManager组件之间的心跳连接,至此完成了TaskManager启动后向ResourceManager进行RPC网络连接注册的全部流程。

如代码所示

  • TaskExecutor.connectToResourceManager()方法中首先会创建TaskExecutorRegistration注册信息和TaskExecutorToResourceManagerConnection对象。
  • 然后调用TaskExecutorToResourceManagerConnection.start()方法启动TaskManager和ResourceManager之间的RPC注册连接。
private void connectToResourceManager() {assert(resourceManagerAddress != null);assert(establishedResourceManagerConnection == null);assert(resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);// TaskExecutor注册信息final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(getAddress(),getResourceID(),taskManagerLocation.dataPort(),hardwareDescription,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile());resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();
}

接着看RegisteredRpcConnection.start()的代码逻辑,如代码所示。

public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");// 创建RetryingRegistrationfinal RetryingRegistration<F, G, S> newRegistration = createNewRegistration();// 启动RetryingRegistrationif (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {newRegistration.startRegistration();} else {// 并行启动后,直接取消当前RegistrationnewRegistration.cancel();}
}

关注:RetryingRegistration.startRegistration()逻辑。

  1. 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGateway。
  2. 创建RpcGateway代理类后,就可以连接到指定的RpcEndpoint了。对于rpcService.connect()方法的定义,我们已经在RPC底层原理中介绍过。
  3. 创建RPC连接后,尝试注册操作。
  4. 如果注册失败,则进行Retry操作,除非接收到取消操作的消息。
public void startRegistration() {if (canceled) {return;}try {final CompletableFuture<G> rpcGatewayFuture;// 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGatewayif (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}// 成功获取网关后,尝试注册操作CompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync((G rpcGateway) -> {log.info("Resolved {} address, beginning registration", targetName);register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());},rpcService.getExecutor());// 如果注册失败,则进行Retry操作,除非取消操作rpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !canceled) {final Throwable strippedFailure =ExceptionUtils.stripCompletionException(failure);// 间隔指定时间后再次注册startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());}},rpcService.getExecutor());}catch (Throwable t) {completionFuture.completeExceptionally(t);cancel();}}

继续了解ResourceManagerRegistration.invokeRegistration()的具体实现。

该方法会创建和ResourceManagerGateway之间的连接以及注册操作
resourceManager会接收来自TaskExecutor的注册信息,并根据taskExecutorRegistration提供的注册信息,将TaskExecutor信息记录在ResourceManager的本地存储中,然后开启TaskExecutor之间的心跳连接。至此,TaskManager能和ResourceManager进行正常的RPC通信了。

protected CompletableFuture<RegistrationResponse> invokeRegistration(ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {Time timeout = Time.milliseconds(timeoutMillis);return resourceManager.registerTaskExecutor(taskExecutorRegistration,timeout);
}

对于其他组件之间的RpcConnection注册操作,例如TaskManager与JobMaster之间的RPC连接注册,基本上和ResourceManagerRegistration一样,这里暂不介绍。

接下来我们看JobMaster是如何向ResourceManager申请Slot计算资源的。

 

3. JobMaster向ResourceManager申请Slot计算资源

当JobMaster组件启动后,

  • 会(调用JobMaster.startJobMasterServices())启动JobMaster中的内部服务,其中就包括了SlotPool组件。
  • 同时会创建和启动JobMaster与ResourceManager之间的RPC连接ResourceManagerConnection。创建成功后,会执行包括向ResourceManager发送申请Slot计算资源的RPC请求等后续操作。

如代码所示

//从SlotPoolImpl.connectToResourceManager()可以看出,方法中分别遍历
//waitingForResourceManager集合中的PendingRequest,
//然后就每个PendingRequest调用requestSlotFromResourceManager()方法向
//ResourceManager申请PendingRequest中指定的Slot计算资源。
public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {this.resourceManagerGateway = checkNotNull(resourceManagerGateway);for (PendingRequest pendingRequest : waitingForResourceManager.values()) {requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);}waitingForResourceManager.clear();
}

继续看SlotPoolImpl.requestSlotFromResourceManager()方法的实现,如下代码所示。

  1. 创建AllocationID并将pendingRequest和AllocationID存储在pendingRequests集合中。
  2. 判断pendingRequest是否出现异常或已经分配了其他AllocationID,如果出现异常或已分配则取消当前pendingRequest中的资源分配请求。
  3. 调用resourceManagerGateway.requestSlot()远程RPC方法向ResourceManager申请Slot计算资源,此时会在方法中创建SlotRequest对象,指定申请Slot计算资源的具体参数。
  4. ResourceManager接收到SlotPool发送的SlotRequest请求后,会将SlotRequest转发给SlotManager进行处理,此时如果能正常分配到Slot资源,则会返回Acknowledge信息。
  5. 调用FutureUtils.whenCompleteAsyncIfNotDone()方法执行返回rmResponse CompletableFuture的对象,此时如果Slot资源申请过程出现异常,则调用slotRequestToResourceManager-Failed()方法进行处理。
private void requestSlotFromResourceManager(final ResourceManagerGateway resourceManagerGateway,final PendingRequest pendingRequest) {checkNotNull(resourceManagerGateway);checkNotNull(pendingRequest);log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile());final AllocationID allocationId = new AllocationID();pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId,pendingRequest);pendingRequest.getAllocatedSlotFuture().whenComplete((AllocatedSlot allocatedSlot, Throwable throwable) -> {if (throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) {resourceManagerGateway.cancelSlotRequest(allocationId);}});CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(jobMasterId,new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),rpcTimeout);FutureUtils.whenCompleteAsyncIfNotDone(rmResponse,componentMainThreadExecutor,(Acknowledge ignored, Throwable failure) -> {if (failure != null) {slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);}});
}

从以上实例可以看出,集群运行时中各个组件之间都是基于RPC通信框架相互访问的。RpcEndpoint组件会创建与其他RpcEndpoint之间的RegisteredRpcConnection,并通过RpcGateway接口的动态代理类与其他组件进行通信。

需要注意的是,Flink把Akka作为RPC底层的通信框架,但没有使用Akka其他丰富的监督功能,并且未来有去掉Akka依赖的可能。

 
参考:《Flink设计与实现:核心原理与源码解析》–张利兵

这篇关于【Flink集群RPC通讯机制(四)】集群组件(tm、jm与rm)之间的RPC通信的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/735651

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

如何在页面调用utility bar并传递参数至lwc组件

1.在app的utility item中添加lwc组件: 2.调用utility bar api的方式有两种: 方法一,通过lwc调用: import {LightningElement,api ,wire } from 'lwc';import { publish, MessageContext } from 'lightning/messageService';import Ca

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

day-51 合并零之间的节点

思路 直接遍历链表即可,遇到val=0跳过,val非零则加在一起,最后返回即可 解题过程 返回链表可以有头结点,方便插入,返回head.next Code /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}*

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识