Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期

本文主要是介绍Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上一期指路:

上一期​​​​​​​

承接上一期讲到YarnTaskExecutorRunner的main方法,我们继续往下分析。

1.YarnTaskExecutorRunner#main->YarnTaskExecutorRunner#runTaskManagerSecurely->TaskManagerRunner#runTaskManagerSecurely

	public static void runTaskManagerSecurely(Configuration configuration) throws Exception {replaceGracefulExitWithHaltIfConfigured(configuration);final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);FileSystem.initialize(configuration, pluginManager);SecurityUtils.install(new SecurityConfiguration(configuration));SecurityUtils.getInstalledContext().runSecured(() -> {runTaskManager(configuration, pluginManager);return null;});}

2. TaskManagerRunner#runTaskManager

	public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);taskManagerRunner.start();}

①new TaskManagerRunner

构建TaskManagerRunner

②taskManagerRunner.start()

启动

3.TaskManagerRunner#start->TaskExecutorToServiceAdapter#start->RpcEndpoint#start

rpcServer.start()

rpc服务启动。即发消息通知底层的 AkkaRpcActor 切换为 START 状态。那么直接看TaskExecutor的onStart方法

4.TaskExecutor#onStart->TaskExecutor#startTaskExecutorServices

	private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// tell the task slot table who's responsible for the task slot actionstaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// start the job leader servicejobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());} catch (Exception e) {handleStartTaskExecutorServicesException(e);}}

5.StandaloneLeaderRetrievalService#start->TaskExecutor的内部类ResourceManagerLeaderListener#notifyLeaderAddress->TaskExecutor#notifyOfNewResourceManagerLeader->TaskExecutor#reconnectToResourceManager->TaskExecutor#tryConnectToResourceManager->TaskExecutor#connectToResourceManager

	private void connectToResourceManager() {assert(resourceManagerAddress != null);assert(establishedResourceManagerConnection == null);assert(resourceManagerConnection == null);log.info("Connecting to ResourceManager {}.", resourceManagerAddress);final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(getAddress(),getResourceID(),unresolvedTaskManagerLocation.getDataPort(),JMXService.getPort().orElse(-1),hardwareDescription,memoryConfiguration,taskManagerConfiguration.getDefaultSlotResourceProfile(),taskManagerConfiguration.getTotalResourceProfile());resourceManagerConnection =new TaskExecutorToResourceManagerConnection(log,getRpcService(),taskManagerConfiguration.getRetryingRegistrationConfiguration(),resourceManagerAddress.getAddress(),resourceManagerAddress.getResourceManagerId(),getMainThreadExecutor(),new ResourceManagerRegistrationListener(),taskExecutorRegistration);resourceManagerConnection.start();}

6.RegisteredRpcConnection#start

	public void start() {checkState(!closed, "The RPC connection is already closed");checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {newRegistration.startRegistration();} else {// concurrent start operationnewRegistration.cancel();}}

①createNewRegistration

创建一个te向rm发起注册

②startRegistration

启动这个注册

7.RetryingRegistration#startRegistration

	public void startRegistration() {if (canceled) {// we already got canceledreturn;}try {// trigger resolution of the target address to a callable gatewayfinal CompletableFuture<G> rpcGatewayFuture;if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));} else {rpcGatewayFuture = rpcService.connect(targetAddress, targetType);}// upon success, start the registration attemptsCompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync((G rpcGateway) -> {log.info("Resolved {} address, beginning registration", targetName);register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());},rpcService.getExecutor());// upon failure, retry, unless this is cancelledrpcGatewayAcceptFuture.whenCompleteAsync((Void v, Throwable failure) -> {if (failure != null && !canceled) {final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);if (log.isDebugEnabled()) {log.debug("Could not resolve {} address {}, retrying in {} ms.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure);} else {log.info("Could not resolve {} address {}, retrying in {} ms: {}",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());}startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());}},rpcService.getExecutor());}catch (Throwable t) {completionFuture.completeExceptionally(t);cancel();}}

①rpcService.connect

将目标地址解析为一个可调用的网关

②register

成功后,就开始尝试注册

8.RetryingRegistration#register->TaskExecutorToResourceManagerConnection的内部类ResourceManagerRegistration#invokeRegistration->ResourceManager#registerTaskExecutorInternal

	private RegistrationResponse registerTaskExecutorInternal(TaskExecutorGateway taskExecutorGateway,TaskExecutorRegistration taskExecutorRegistration) {ResourceID taskExecutorResourceId = taskExecutorRegistration.getResourceId();WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);if (oldRegistration != null) {// TODO :: suggest old taskExecutor to stop itselflog.debug("Replacing old registration of TaskExecutor {}.", taskExecutorResourceId.getStringWithMetadata());// remove old task manager registration from slot managerslotManager.unregisterTaskManager(oldRegistration.getInstanceID(),new ResourceManagerException(String.format("TaskExecutor %s re-connected to the ResourceManager.", taskExecutorResourceId.getStringWithMetadata())));}final WorkerType newWorker = workerStarted(taskExecutorResourceId);String taskExecutorAddress = taskExecutorRegistration.getTaskExecutorAddress();if (newWorker == null) {log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did " +"not recognize it", taskExecutorResourceId.getStringWithMetadata(), taskExecutorAddress);return new RegistrationResponse.Decline("unrecognized TaskExecutor");} else {WorkerRegistration<WorkerType> registration = new WorkerRegistration<>(taskExecutorGateway,newWorker,taskExecutorRegistration.getDataPort(),taskExecutorRegistration.getJmxPort(),taskExecutorRegistration.getHardwareDescription(),taskExecutorRegistration.getMemoryConfiguration());log.info("Registering TaskManager with ResourceID {} ({}) at ResourceManager", taskExecutorResourceId.getStringWithMetadata(), taskExecutorAddress);taskExecutors.put(taskExecutorResourceId, registration);taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {@Overridepublic void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the// TaskManager}@Overridepublic void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);}});return new TaskExecutorRegistrationSuccess(registration.getInstanceID(),resourceId,clusterInformation);}}

①getResourceId

获取te的资源id

②taskExecutors.remove

移除之前注册的缓存信息

③slotManager.unregisterTaskManager

从slotManager中删除旧的taskManager注册

④workerStarted

当一个worker被启动时回调得到workerType

⑤getTaskExecutorAddress

获取te地址

⑥new WorkerRegistration<>

构建WorkerRegistration

⑦log.info

taskExecutors.put

打印日志在rm上注册tm

放入缓存中

⑧taskManagerHeartbeatManager.monitorTarget

监控tm作为心跳目标

⑨new TaskExecutorRegistrationSuccess

创建并返回注册成功的信息

由于涉及到rpc调用,发送了注册成功的信息,那么就一定会回调TaskExecutor中的onRegistrationSuccess方法,剩下的我们下期分析。

总览

这一期涉及到的源码流程图如下:

我们下期见!

这篇关于Flink源码系列(TaskExecutor向ResourceManager发起注册[flink内部,非yarn中rm])-第十期的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/528948

相关文章

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Spring 中 BeanFactoryPostProcessor 的作用和示例源码分析

《Spring中BeanFactoryPostProcessor的作用和示例源码分析》Spring的BeanFactoryPostProcessor是容器初始化的扩展接口,允许在Bean实例化前... 目录一、概览1. 核心定位2. 核心功能详解3. 关键特性二、Spring 内置的 BeanFactory

Spring Cloud之注册中心Nacos的使用详解

《SpringCloud之注册中心Nacos的使用详解》本文介绍SpringCloudAlibaba中的Nacos组件,对比了Nacos与Eureka的区别,展示了如何在项目中引入SpringClo... 目录Naacos服务注册/服务发现引⼊Spring Cloud Alibaba依赖引入Naco编程s依

Java捕获ThreadPoolExecutor内部线程异常的四种方法

《Java捕获ThreadPoolExecutor内部线程异常的四种方法》这篇文章主要为大家详细介绍了Java捕获ThreadPoolExecutor内部线程异常的四种方法,文中的示例代码讲解详细,感... 目录方案 1方案 2方案 3方案 4结论方案 1使用 execute + try-catch 记录

Go路由注册方法详解

《Go路由注册方法详解》Go语言中,http.NewServeMux()和http.HandleFunc()是两种不同的路由注册方式,前者创建独立的ServeMux实例,适合模块化和分层路由,灵活性高... 目录Go路由注册方法1. 路由注册的方式2. 路由器的独立性3. 灵活性4. 启动服务器的方式5.

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin