本文主要是介绍【Yarn】Yarn的基本执行流程(二)AM Container的启动,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Yarn的基本执行流程之AM Container的启动
文章目录
- Yarn的基本执行流程之AM Container的启动
- AM Container(第一个Container)的启动
- NM RM心跳交互触发调度Container的启动流程
- RM中调度启动AM流程
- AMLauncher启动流程
- NM上容器的启动流程
- 下载资源
- AM Container 启动与运行
- NM RM心跳交互触发调度更新Container的状态
- AM启动之后怎么注册到RM上
- AM怎么申请运行其他Container
AM Container(第一个Container)的启动
NM RM心跳交互触发调度Container的启动流程
当有NM节点向RM发送心跳请求时,RM内部最终会以事件的形式通知到调度器,调度器则选择合适的应用为其分配资源。
- NM上的节点NodeStatusUpdater服务的实现NodeStatusUpdaterImpl与ResourceTrackerService保持心跳联系,定时发送信息。
- ResourceTrackerService验证相关信息后,会向调度器发送RMNodeStatusEvent事件。
- RMNodeImpl向调度器发送NodeUpdateSchedulerEvent事件,由FairScheduler进行nodeUpdate处理,其中就包含了启动AM流程。
RM中调度启动AM流程
- FairSchduler在更新节点时会调用attemptScheduling方法,尝试为Container分配资源
- FSAppAttempt申请分配Container资源,实例化Container。
- 申请到了Container资源,触发
RMContainerEventType.START
事件。在完成ContainerStartedTransition方法后,RMContainerState.NEW将变为RMContainerState.ALLOCATED。 - ContainerStartedTransition方法发送了
RMAppAttemptEventType.CONTAINER_ALLOCATED
事件,触发AMContainerAllocatedTransition方法,RMAppAttemptState.SCHEDULED将变为RMAppAttemptState.ALLOCATED_SAVING。 - 从调度器获取启动 AM 的 Container。
- 把之前申请资源拿走。
- 处理
RMContainerEventType.ACQUIRED
事件,RMContainerState.ALLOCATED将变为RMContainerState.ACQUIRED - 把当前Container所处的Node放入上下文,RMAppState.ACCEPTED保持不变
- 当第5步开始的链路结束后,回到RMAppAttemptImpl,执行异步存储Attempt信息。
- 处理
RMStateStoreEventType.STORE_APP_ATTEMPT
事件,RMStateStore.storeApplicationAttemptStateInternal
持久化存储,是存ZK??? - 存储成功时候,发送
RMAppAttemptEventType.ATTEMPT_NEW_SAVED
事件,RMAppAttemptState.ALLOCATED_SAVING将变为RMAppAttemptState.ALLOCATED。 - 执行registerClientToken()注册客户端token,向处理器发送
AMLauncherEventType.LAUNCH
事件。处理器是将事件交由ApplicationMasterLauncher继续处理AMLauncher启动流程。
AMLauncher启动流程
ResourceManager启动的时候会将会创建ApplicationMasterLauncher服务,用来进行AM Container的启动与关闭。
-
ApplicationMasterLauncher回去创建一个AMLauncher实例,用来启动AM Container。
-
将AMLauncher实例放入masterEvents队列。AMLauncher实例的处理是一种异步的处理方式。
-
在实例化ApplicationMasterLauncher时,会创建一个LauncherThread线程实例,用来调度处理masterEvents。
-
LauncherThread从队列中获取AMLauncher调度执行。
-
执行launch()方法,这里面包含了与NM交互,具体启动NM上Container的部分。
-
向事件处理器发送
RMAppAttemptEventType.LAUNCHED
事件,调度执行RMAppAttemptImpl的AMLaunchedTransition(),此时认为AM container已经启动,等待AM container注册到RN上,将刚刚启动的 ApplicationMaster 注册到 AMLivelinessMonitor,启动心跳监控。RMAppAttemptState.ALLOCATED将变为 RMAppAttemptState.LAUNCHED。
NM上容器的启动流程
由ContainerManagerImpl被调用到startContainers()方法展开。遍历StartContainerRequests,处理每个StartContainerRequest。具体操作是在startContainerInternal() 方法。
下载资源
- 创建一个ContainerImpl实例,初始化相关信息。
- 为新的appc创建一个Application实例,即app的AM Container来初始化。
- 获取日志聚合上下文,将Application信息,存储下来。
- 向调度器发送
ApplicationInitEvent(ApplicationEventType.INIT_APPLICATION)
事件,调用AppInitTransition()方法,执行成功后会将ApplicationState.INITING变成ApplicationState.RUNNING - 发送
LogHandlerAppStartedEvent(LogHandlerEventType.APPLICATION_STARTED)
事件,由LogAggregationService
来处理。会为APP创建一个AppLogAggregatorImpl实例,并由线程池调度它。 - 向调度器发送
ApplicationEvent(ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)
事件,触发AppLogInitDoneTransition()
方法。 - 向调度器发送
ApplicationLocalizationEvent(LocalizationEventType.INIT_APPLICATION_RESOURCES)
事件,由ResourceLocalizationService
来处理。开始对app的创建应用程序资源跟踪。 - 向调度器发送
ApplicationInitedEvent(ApplicationEventType.APPLICATION_INITED)
事件,调用ApplicationImpl中的AppInitDoneTransition方法处理。 - 遍历此APP中的所有Container(需要建立的),向调度器发送
ContainerInitEvent(ContainerEventType.INIT_CONTAINER)
事件,由ContainerImpl的RequestResourcesTransition方法处理,ContainerState.LOCALIZING.NEW将变为ContainerState.LOCALIZING - 会发送辅助服务的相关事件
AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT)
和AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT)
暂时展开,之后发送ContainerLocalizationRequestEvent
事件,进行容器资源的本地化。 - 资源本地服务ResourceLocalizationService为每个资源请求构建LocalResourcesTracker(LocalResourcesTrackerImpl)。
- 由LocalResourcesTrackerImpl发送
ResourceRequestEvent(ResourceEventType.REQUEST)
事件,ResourceState.INIT将变为ResourceState.DOWNLOADING - 转发
LocalizerResourceRequestEvent<br>(LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION)
事件给ResourceLocalizationService - 按照资源的分类交交由PublicLocalizer和LocalizerRunner两个线程进行处理,此时该容器应该是没有对应的LocalizerRunner的,所以先进行LocalizerRunner实例化之后再启动。将PRIVATE和APPLICATION资源交给LocalizerRunner。
- 请求公共资源下载交给PublicLocalizer。
- 发送
ResourceLocalizedEvent(ResourceEventType.LOCALIZED)
事件,交由LocalizedResource处理。 - 没当一个下载完成之后,发送
ContainerEventType.RESOURCE_LOCALIZED
事件,触发该容器的全部资源的检查,如果还有资源没下载,就保持ContainerState.LOCALIZING
,等待下次事件触发。 - 如果全部资源下载完成,则发送
ContainerLocalizationEvent(LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED)
事件给ResourceLocalizationService,去销毁该容器的LocalizerRunner线程, - 发送
ContainerSchedulerEvent<br>(ContainerSchedulerEventType.SCHEDULE_CONTAINER)
事件给ContainerScheduler。
此时ContainerState.LOCALIZING变为ContainerState.SCHEDULED。
AM Container 启动与运行
-
尝试启动pending的容器们,在资源满足情况的前提下,遍历容器,按队列顺序向ContainerImpl发送启动事件,直到无法满足。
-
向容器启动器发送
ContainersLauncherEvent<br>(ContainersLauncherEventType.LAUNCH_CONTAINER)
事件 -
创建ContainerLaunch实例,并由线程池启动该实例线程
-
更新容器信息到对应的日志目录和工作目录,将待运行的 Container 所需的环境和运行命令写到 Shell 脚本中
launch_container.sh
脚本中,并将启动该脚本的命令写入default_container_executro.sh
中,然后通过该脚本启动 Container。在这里主要对执行的环境和内容进行相关配置。ContainerState.SCHEDULED将变为ContainerState.RUNNING -
发送
ContainerStartMonitoringEvent
事件,开启容器监控。此刻以及认为容器以及启动了。 -
将容器的启动信息更新进持久化存储
-
发送启动命令给默认容器执行器(DefaultContainerExecutor),真正启动 Container。
在DefaultContainerExecutor的launchContainer方法中会执行
bash default_container_executor.sh
命令,default_container_executor.sh
脚本的包含部分内容有:,其中huatuo
是队列名称,/data10/yarn
是配置yarn.nodemanager.local-dirs
的值。/bin/bash "/data10/yarn/usercache/huatuo/appcache/application_1723033197835_26906/container_e87_1723033197835_26906_01_000001/default_container_executor.sh"
调用了
default_container_executor_session.sh
脚本#!/bin/bash ... exec /bin/bash "/data10/yarn/usercache/huatuo/appcache/appcache/application_1723033197835_26906/container_e87_1723033197835_26906_01_000001/launch_container.sh"
最后调用了
launch_container.sh
脚本,内容如下:... echo "Launching container" exec /bin/bash -c "$JAVA_HOME/bin/java -server -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Xmx2048m -Djava.io.tmpdir=$PWD/tmp '-XX:MaxMetaspaceSize=512m' '-XX:+PrintGCDetails' '-XX:+PrintGCDateStamps' '-XX:+UseParNewGC' '-XX:+UseConcMarkSweepGC' '-XX:CMSInitiatingOccupancyFraction=70' '-XX:+UseCMSInitiatingOccupancyOnly' '-XX:+ExplicitGCInvokesConcurrent' '-XX:ParallelGCThreads=4' '-XX:ConcGCThreads=2' '-XX:GCTimeLimit=90' '-XX:GCHeapFreeLimit=10' '-XX:OnOutOfMemoryError=kill %p' '-Dfastjson.parser.safeMode=true' -Xloggc:/data10/logs/application_1723033197835_26906/container_e87_1723033197835_26906_01_000001/gclog -Dspark.yarn.app.container.log.dir=/data10/logs/application_1723033197835_26906/container_e87_1723033197835_26906_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'com.suning.bigquery.worker.BigqueryWorker' --jar file:/home/bigquery/software/bigquery-3.0.2/lib/worker/bigquery-worker-3.0.2.jar --arg 'huatuo' --arg '0' --arg 'namenode1-sit.cnsuning.com:2015,namenode2-sit.cnsuning.com:2015,slave01-sit.cnsuning.com:2015' --properties-file $PWD/__spark_conf__/__spark_conf__.properties --dist-cache-conf $PWD/__spark_conf__/__spark_dist_cache__.properties 1> /data10/logs/application_1723033197835_26906/container_e87_1723033197835_26906_01_000001/stdout 2> /data10/logs/application_1723033197835_26906/container_e87_1723033197835_26906_01_000001/stderr"
这个container是个Spark任务,所以这里调用的是
org.apache.spark.deploy.yarn.ApplicationMaster
,并将标准输出写到stdout中,将标准错误输出写到stderr中。这也就是container的log目录里有三个文件的原因。至此,AM Container启动完毕。
NM RM心跳交互触发调度更新Container的状态
RMContainerState.ACQUIRED, RMContainerState.RUNNING
AM启动之后怎么注册到RM上
不同的任务类型在AM Container 启动的类不同,由这个l类管理进行AM注册到RM上
-
mapreduce任务的AM上启动类:
org.apache.hadoop.mapreduce.v2.app.MRAppMaster
-
spark任务的AM上启动类:
org.apache.spark.deploy.yarn.ApplicationMaster
-
flink任务的AM上启动类:
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
以MRAppMaster为例:
- Container启动的时候会去加载实例化MRAppMaster类,由MRAppMaster类中去创建RMContainerAllocator。
- 通过服务框架进行服务的初始化
- 通过服务框架进行服务的启动
- 与RM基于ApplicationMasterProtocol协议建立连接,调用执行ApplicationMasterService的registerApplicationMaster方法,
- 通过AMS处理链进行转发
- 获取节点上的需要更新的Container,即是获取nodeUpdateQueue队列中的元素,其中包含了新启动的Container。
- 触发 RMAppAttemptRegistrationEvent (RMAppAttemptEventType.REGISTERED) 事件,执行AMRegisteredTransition方法,RMAppAttemptState.ALLOCATED将变成RMAppAttemptState.RUNNING,
- 触发 RMAppEvent (RMAppEventType.ATTEMPT_REGISTERED) 事件,执行方法AMRegisteredTransition(),RMAppState.ACCEPTED将变成RMAppState.RUNNING
AM怎么申请运行其他Container
以MRAppMaster为例:
在MRAppMaster启动中会创建一个JobImpl,由JobImpl计算出需要的MapTasks和ReduceTasks的个数。在RMCommunicator中会独立有一个AllocatorRunnable线程,在注册AM之后,会启动该线程,通过心跳机制,请求与RM中ApplicationMasterService服务的allocate
方法,进行资源申请的调度。
1~4为AM注册到RM上流程(同上)
- 在注册AM的之后,会启动分配器线程
- AllocatorRunnable与RMContainerAllocator之前有心跳交互
- 开始触发向RM申请接下来任务需要容器的资源流程
- 请求与RM中ApplicationMasterService服务的
allocate
方法,获取分配结果,去开始对task进行分配和信息填充。 - 检查分配的Container信息情况
- 为task构建启动Context信息
- 触发ContainerRemoteLaunchEvent(ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)事件,准备向分配的目标NodeManager发送启动容器的请求
- 向分配的目标NodeManager发送启动容器请求
- 从之前一些注册的心跳回调方法的队列heartbeatCallbacks中获取Runnable并执行
NM上的APP相关的任务会通过心跳机制来进行交互,更新运行情况。
【Yarn】Yarn的基本执行流程(三) 应用运行结束流程
这篇关于【Yarn】Yarn的基本执行流程(二)AM Container的启动的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!