Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期

本文主要是介绍Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上一期指路:

上一期​​​​​​​

承接上一期讲到YarnTaskExecutorRunner的main方法,我们继续往下分析。

1.YarnTaskExecutorRunner#main->YarnTaskExecutorRunner#runTaskManagerSecurely->TaskManagerRunner#runTaskManagerSecurely

	public static void runTaskManagerSecurely(Configuration configuration) throws Exception {replaceGracefulExitWithHaltIfConfigured(configuration);final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);FileSystem.initialize(configuration, pluginManager);SecurityUtils.install(new SecurityConfiguration(configuration));SecurityUtils.getInstalledContext().runSecured(() -> {runTaskManager(configuration, pluginManager);return null;});}

2. TaskManagerRunner#runTaskManager

	public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);taskManagerRunner.start();}

①new TaskManagerRunner

构建TaskManagerRunner

②taskManagerRunner.start()

启动

3.TaskManagerRunner#start->TaskExecutorToServiceAdapter#start->RpcEndpoint#start

rpcServer.start()

rpc服务启动。即发消息通知底层的 AkkaRpcActor 切换为 START 状态。那么直接看TaskExecutor的onStart方法

4.TaskExecutor#onStart->TaskExecutor#startTaskExecutorServices

	private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// tell the task slot table who's responsible for the task slot actionstaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// start the job leader servicejobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}

5.StandaloneLeaderRetrievalService#start->TaskExecutor的内部类ResourceManagerLeaderListener#notifyLeaderAddress->TaskExecutor#notifyOfNewResourceManagerLeader->TaskExecutor#reconnectToResourceManager->TaskExecutor#tryConnectToResourceManager->TaskExecutor#connectToResourceManager

	private void connectToResourceManager() {assert(resourceManagerAddress != null);assert(establishedResourceManagerConnection == null);assert(resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile());resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();}

6.RegisteredRpcConnection#start

	public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}

①createNewRegistration

创建一个te向rm发起注册

②startRegistration

启动这个注册

7.RetryingRegistration#startRegistration

	public void startRegistration() {if (canceled) {// we already got canceledreturn;}try {// trigger resolution of the target address to a callable gatewayfinal CompletableFuture<G> rpcGatewayFuture;if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}// upon success, start the registration attemptsCompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync((G rpcGateway) -> {log.info("Resolved {} address, beginning registration", targetName);register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());},rpcService.getExecutor());// upon failure, retry, unless this is cancelledrpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !canceled) {final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);if (log.isDebugEnabled()) {log.debug("Could not resolve {} address {}, retrying in {} ms.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure);} else {log.info("Could not resolve {} address {}, retrying in {} ms: {}",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());}startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());}},rpcService.getExecutor());}catch (Throwable t) {completionFuture.completeExceptionally(t);cancel();}}

①rpcService.connect

将目标地址解析为一个可调用的网关

②register

成功后,就开始尝试注册

8.RetryingRegistration#register->TaskExecutorToResourceManagerConnection的内部类ResourceManagerRegistration#invokeRegistration->ResourceManager#registerTaskExecutorInternal

	private RegistrationResponse registerTaskExecutorInternal(TaskExecutorGateway taskExecutorGateway,TaskExecutorRegistration taskExecutorRegistration) {ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);if (oldRegistration != null) {// TODO :: suggest old taskExecutor to stop itselflog.debug("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId.getStringWithMetadata());// remove old task manager registration from slot managerslotManager.unregisterTaskManager(oldRegistration.getInstanceID(),new ResourceManagerException(String.format("TaskExecutor %s re-connected to the ResourceManager.", taskExecutorResourceId.getStringWithMetadata())));}final WorkerType newWorker = workerStarted(taskExecutorResourceId);String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();if (newWorker == null) {log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did " +"not recognize it", taskExecutorResourceId.getStringWithMetadata(), taskExecutorAddress);return new RegistrationResponse.Decline("unrecognized TaskExecutor");} else {WorkerRegistration<WorkerType> registration = new WorkerRegistration<>(taskExecutorGateway,newWorker,taskExecutorRegistration.getDataPort(),taskExecutorRegistration.getJmxPort(),taskExecutorRegistration.getHardwareDescription(),taskExecutorRegistration.getMemoryConfiguration());log.info("Registering TaskManager with ResourceID {} ({}) at ResourceManager", taskExecutorResourceId.getStringWithMetadata(), taskExecutorAddress);taskExecutors.put(taskExecutorResourceId, registration);taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {@Overridepublic void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the// TaskManager}@Overridepublic void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);}});return new TaskExecutorRegistrationSuccess(registration.getInstanceID(),resourceId,clusterInformation);}}

①getResourceId

获取te的资源id

②taskExecutors.remove

移除之前注册的缓存信息

③slotManager.unregisterTaskManager

从slotManager中删除旧的taskManager注册

④workerStarted

当一个worker被启动时回调得到workerType

⑤getTaskExecutorAddress

获取te地址

⑥new WorkerRegistration<>

构建WorkerRegistration

⑦log.info

taskExecutors.put

打印日志在rm上注册tm

放入缓存中

⑧taskManagerHeartbeatManager.monitorTarget

监控tm作为心跳目标

⑨new TaskExecutorRegistrationSuccess

创建并返回注册成功的信息

由于涉及到rpc调用,发送了注册成功的信息,那么就一定会回调TaskExecutor中的onRegistrationSuccess方法,剩下的我们下期分析。

总览

这一期涉及到的源码流程图如下:

我们下期见!

这篇关于Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/528948

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显