Hadoop-Yarn-NodeManager都做了什么

2024-02-18 11:04
文章标签 hadoop yarn nodemanager

本文主要是介绍Hadoop-Yarn-NodeManager都做了什么,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、上下文

在我的<Hadoop-Yarn-启动篇>博客中已经简要的分析了NodeManager的启动过程,NodeManager是管理整个集群资源的直接角色,因此我们有必要细致的分析下NodeManager都做了什么,一般Hadoop源码中各个角色启动时都是在serviceInit()方法中初始化该角色所需要的服务并添加到服务列表,在serviceStart()中依次启动各个服务,下面我们就依次来分析下NodeManager中所有的服务已经每个服务都做了什么。

三、NodeManager服务列表

在我的<Hadoop-Yarn-启动篇>博客中的java部分>NodeManager>serviceInit中有详细的源码,下面我们依次来列举下:

1、NMLeveldbStateStoreService

2、DeletionService

3、NodeHealthCheckerService

4、NodeLabelsProvider

5、NodeAttributesProvider

6、NodeResourceMonitorImpl

7、ContainerManagerImpl

8、NMLogAggregationStatusTracker

9、WebServer

10、AsyncDispatcher

11、JvmPauseMonitor

12、NodeStatusUpdater

四、NodeManager服务源码分析

1、NMLeveldbStateStoreService

//父类
public abstract class NMStateStoreService extends AbstractService {//......省略......//初始化状态存储public void serviceInit(Configuration conf) throws IOException {initStorage(conf);}//启动状态存储以供使用public void serviceStart() throws IOException {startStorage();}//......省略......
}//子类
public class NMLeveldbStateStoreService extends NMStateStoreService {//......省略......protected void initStorage(Configuration conf)throws IOException {//获取 yarn.nodemanager.recovery.dir 值 并依次作为根目录进行创建//默认值为 ${hadoop.tmp.dir}/yarn-nm-recovery //启用恢复时,节点管理器将在其中存储状态的本地文件系统目录。//并将其中的内容封装成 LevelDB 的形式//LevelDB 是一种为分布式而生的键-值数据库 Chrome 浏览器中涉及的 IndexedDB(基于 HTML5 API 的数据库),就是基于 LevelDB 构建而成的db = openDatabase(conf);//校验版本checkVersion();//开始准备计时//获取 yarn.nodemanager.recovery.compaction-interval-secs 的值,默认值 3600//换算成毫秒为 3600 * 1000//创建一个新计时器,该计时器的关联线程具有指定的名称,并且可以指定为作为守护进程运行。//开始按照计时器调度任务startCompactionTimer(conf);}protected void startStorage() throws IOException {//假设我们开始时是健康的isHealthy = true;}//......省略......
}

2、DeletionService

public class DeletionService extends AbstractService {//ContainerExecutor : 用于在底层操作系统上启动容器的机制的抽象。所有执行器实现都必须扩展ContainerExecutor。//public DeletionService(ContainerExecutor containerExecutor,NMStateStoreService stateStore) {super(DeletionService.class.getName());this.containerExecutor = containerExecutor;this.debugDelay = 0;this.stateStore = stateStore;}protected void serviceInit(Configuration conf) throws Exception {ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("DeletionService #%d").build();if (conf != null) {//线程池:ScheduledThreadPoolExecutor的扩展,提供附加功能,主要是提供了周期任务和延迟任务相关的操作//获取 yarn.nodemanager.delete.thread-count 的值 默认值 4 (清理中使用的线程数)sched = new HadoopScheduledThreadPoolExecutor(conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);//删除资源之前延迟,以便于调试NM问题//获取 yarn.nodemanager.delete.debug-delay-sec 的值 默认为 0debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);} else {sched = new HadoopScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);}//设置是否执行现有延迟任务的策略,即使此执行器已关闭。在这种情况下,这些任务只会在shutdownNow时终止,或者在已经关闭时将策略设置为false后终止。默认情况下,此值为true。sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);//设置线程在终止之前可以保持空闲的时间限制。如果池中当前的线程数超过核心线程数,则在等待此时间而不处理任务后,多余的线程将被终止。这将覆盖构造函数中设置的任何值。sched.setKeepAliveTime(60L, SECONDS);if (stateStore.canRecover()) {//恢复操作recover(stateStore.loadDeletionServiceState());}super.serviceInit(conf);}
)

3、NodeHealthCheckerService

NodeManager添加服务代码

  public static NodeHealthScriptRunner getNodeHealthScriptRunner(Configuration conf) {//运行状况检查脚本//获取 yarn.nodemanager.health-checker.script.path 的值 String nodeHealthScript = conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);if(!NodeHealthScriptRunner.shouldRun(nodeHealthScript)) {LOG.info("Node Manager health check script is not available "+ "or doesn't have execute permission, so not "+ "starting the node health script runner.");return null;}//运行脚本的频率//yarn.nodemanager.health-checker.interval-ms 默认值 10 * 60 * 1000 即10分钟long nmCheckintervalTime = conf.getLong(YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);//脚本超时期//yarn.nodemanager.health-checker.script.timeout-ms 默认值 2 * 10 * 60 * 1000 即20分钟long scriptTimeout = conf.getLong(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS,YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);//脚本参数//yarn.nodemanager.health-checker.script.opts String[] scriptArgs = conf.getStrings(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS, new String[] {});//是否在NM启动前运行节点运行状况脚本//yarn.nodemanager.health-checker.run-before-startup 默认值 falseboolean runBeforeStartup = conf.getBoolean(YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP,YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP);//NodeHealthScriptRunner 提供使用配置的节点运行状况脚本检查节点运行状况并向要求运行状况检查器报告的服务报告的功能的类。return new NodeHealthScriptRunner(nodeHealthScript,nmCheckintervalTime, scriptTimeout, scriptArgs, runBeforeStartup);}nodeHealthChecker =new NodeHealthCheckerService(getNodeHealthScriptRunner(conf), dirsHandler);addService(nodeHealthChecker);

NodeHealthScriptRunner负责运行状况脚本检查并向提供服务报告

public class NodeHealthScriptRunner extends AbstractService {//用于初始化脚本路径和间隔时间的值protected void serviceInit(Configuration conf) throws Exception {super.serviceInit(conf);}protected void serviceStart() throws Exception {nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);long delay = 0;if (runBeforeStartup) {//立即启动计时器任务,等待其返回//构造器中赋值,NodeHealthMonitorExecutor是它的内部类,该类用于定期执行节点运行状况脚本//private TimerTask timer = new NodeHealthMonitorExecutor(scriptArgs);timer.run();delay = intervalTime;}//将脚本设置为每隔一段时间定期运行nodeHealthScriptScheduler.scheduleAtFixedRate(timer, delay,intervalTime);super.serviceStart();}private class NodeHealthMonitorExecutor extends TimerTask {String exceptionStackTrace = "";public NodeHealthMonitorExecutor(String[] args) {ArrayList<String> execScript = new ArrayList<String>();execScript.add(nodeHealthScript);if (args != null) {execScript.addAll(Arrays.asList(args));}//ShellCommandExecutor是一个简单的shell命令执行器//如果命令的输出不需要显式解析,并且命令、工作目录和环境保持不变,则应使用ShellCommandExecutor。命令的输出按原样存储,并且预计输出较小。shexec = new ShellCommandExecutor(execScript.toArray(new String[execScript.size()]), null, null, scriptTimeout);}@Overridepublic void run() {HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;try {//执行命令shexec.execute();} catch (ExitCodeException e) {// 忽略脚本的退出代码status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;//在Windows上,我们不会遇到stdout缓冲读取器为超时事件抛出的流关闭IOException。if (Shell.WINDOWS && shexec.isTimedOut()) {status = HealthCheckerExitStatus.TIMED_OUT;}} catch (Exception e) {LOG.warn("Caught exception : " + e.getMessage());if (!shexec.isTimedOut()) {status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;} else {status = HealthCheckerExitStatus.TIMED_OUT;}exceptionStackTrace = StringUtils.stringifyException(e);} finally {if (status == HealthCheckerExitStatus.SUCCESS) {if (hasErrors(shexec.getOutput())) {status = HealthCheckerExitStatus.FAILED;}}reportHealthStatus(status);}}/*** 用于解析节点运行状况监视器的输出并发送到报告地址* * 超时的脚本或导致IOException输出的脚本被忽略* * 如果有以下一种情况,节点被标记为不正常*     1、节点运行状况脚本超时*     2、节点运行状况脚本输出有一行以ERROR开头*     3、执行脚本时引发异常* * 如果脚本抛出{@link IOException}或{@link ExitCodeException},输出将被忽略,* 节点将保持健康,因为脚本可能存在语法错误。* @param status*/void reportHealthStatus(HealthCheckerExitStatus status) {long now = System.currentTimeMillis();switch (status) {case SUCCESS:setHealthStatus(true, "", now);break;case TIMED_OUT:setHealthStatus(false, NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);break;case FAILED_WITH_EXCEPTION:setHealthStatus(false, exceptionStackTrace);break;case FAILED_WITH_EXIT_CODE:// see Javadoc above - we don't report bad health intentionallysetHealthStatus(true, "", now);break;case FAILED:setHealthStatus(false, shexec.getOutput());break;}}/*** 检查输出字符串是否有以ERROR开头的行*/private boolean hasErrors(String output) {String[] splits = output.split("\n");for (String split : splits) {if (split.startsWith(ERROR_PATTERN)) {return true;}}return false;}}}

4、NodeLabelsProvider

默认不开启启用分布式节点标签配置

protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)throws IOException {NodeLabelsProvider provider = null;//获取  yarn.nodemanager.node-labels.provider 的值 默认为空//官方解释://当在RM中将“yarn.node labels.configuration type”配置为“distributed”时,管理员可以通过配置此参数在NM中配置节点标签的提供程序。管理员可以配置“config”、“script”或提供程序的类名。配置的类需要扩展org.apache.hadoop.yarn.server.nodemanager.nodelabels。节点标签提供者。如果配置了“config”,则使用“ConfigurationNodeLabelsProvider”;如果配置“script”,则将使用“ScriptNodeLabelsProvider”。String providerString =conf.get(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null);if (providerString == null || providerString.trim().length() == 0) {//似乎未启用分布式节点标签配置return provider;}switch (providerString.trim().toLowerCase()) {case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:provider = new ConfigurationNodeLabelsProvider();break;case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:provider = new ScriptBasedNodeLabelsProvider();break;default:try {Class<? extends NodeLabelsProvider> labelsProviderClass =conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,null, NodeLabelsProvider.class);provider = labelsProviderClass.newInstance();} catch (InstantiationException | IllegalAccessException| RuntimeException e) {LOG.error("Failed to create NodeLabelsProvider based on Configuration",e);throw new IOException("Failed to create NodeLabelsProvider : " + e.getMessage(), e);}}if (LOG.isDebugEnabled()) {LOG.debug("Distributed Node Labels is enabled"+ " with provider class as : " + provider.getClass().toString());}return provider;}nodeLabelsProvider = createNodeLabelsProvider(conf);
//默认不开启启用分布式节点标签配置
if (nodeLabelsProvider != null) {addIfService(nodeLabelsProvider);nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
}

5、NodeAttributesProvider

默认不开启启用分布式节点属性配置

protected NodeAttributesProvider createNodeAttributesProvider(Configuration conf) throws IOException {NodeAttributesProvider attributesProvider = null;//获取 yarn.nodemanager.node-attributes.provider 的值,默认空//官方解释://此属性确定节点管理器将插入哪个提供程序来收集节点属性。管理员可以配置“config”、“script”或提供程序的类名。配置的类需要扩展org.apache.hadoop.yarn.server.nodemanager.nodelabels。节点属性提供者。如果配置了“config”,则使用“ConfigurationNodeLabelsProvider”;如果配置“script”,则将使用“ScriptBasedNodeAttributesProvider”。String providerString =conf.get(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, null);if (providerString == null || providerString.trim().length() == 0) {return attributesProvider;}switch (providerString.trim().toLowerCase()) {case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:attributesProvider = new ConfigurationNodeAttributesProvider();break;case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:attributesProvider = new ScriptBasedNodeAttributesProvider();break;default:try {Class<? extends NodeAttributesProvider> labelsProviderClass =conf.getClass(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG,null, NodeAttributesProvider.class);attributesProvider = labelsProviderClass.newInstance();} catch (InstantiationException | IllegalAccessException| RuntimeException e) {LOG.error("Failed to create NodeAttributesProvider"+ " based on Configuration", e);throw new IOException("Failed to create NodeAttributesProvider : "+ e.getMessage(), e);}}if (LOG.isDebugEnabled()) {LOG.debug("Distributed Node Attributes is enabled"+ " with provider class as : "+ attributesProvider.getClass().toString());}return attributesProvider;}nodeAttributesProvider = createNodeAttributesProvider(conf);
if (nodeAttributesProvider != null) {addIfService(nodeAttributesProvider);nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
}

6、NodeResourceMonitorImpl

nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);protected NodeResourceMonitor createNodeResourceMonitor() {//NodeResourceMonitorImpl 是 节点资源监视器的实现。它定期跟踪节点的资源利用情况,并将其报告给NMreturn new NodeResourceMonitorImpl(context);
}

NodeResourceMonitorImpl

public class NodeResourceMonitorImpl extends AbstractService implementsNodeResourceMonitor {//初始化节点资源监视器public NodeResourceMonitorImpl(Context context) {super(NodeResourceMonitorImpl.class.getName());this.nmContext = context;this.monitoringThread = new MonitoringThread();}//使用正确的参数初始化服务protected void serviceInit(Configuration conf) throws Exception {//获取 yarn.nodemanager.resource-monitor.interval-ms 的值 默认 3000//官方解释:监视节点和容器的频率。如果为0或为负数,则禁用监视this.monitoringInterval =conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS);//在节点管理器中为节点资源监视器创建ResourceCalculatorPlugin并进行配置。如果未配置插件,此方法将尝试返回可用于此系统的内存计算器插件//LINUX 为 SysInfoLinux() 下一篇博客我们重点详细看下它//WINDOWS 为 SysInfoWindows()//其他操作系统不支持this.resourceCalculatorPlugin =ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf);LOG.info(" Using ResourceCalculatorPlugin : "+ this.resourceCalculatorPlugin);}//启动执行节点资源利用率监视的线程protected void serviceStart() throws Exception {if (this.isEnabled()) {//构造器中已经初始化过了,为new MonitoringThread().start()this.monitoringThread.start();}super.serviceStart();}
}

MonitoringThread

监视此节点的资源利用率的线程

private class MonitoringThread extends Thread {/*** 初始化节点资源监视线程*/public MonitoringThread() {super("Node Resource Monitor");this.setDaemon(true);}/*** 定期监视节点的资源利用率.*/@Overridepublic void run() {while (true) {// 获取节点利用率并将其保存为运行状况 以下指标数据都是通过SysInfoLinux或者SysInfoWindows来获取的//系统中的物理内存的总大小 - 系统中可用物理内存总大小long pmem = resourceCalculatorPlugin.getPhysicalMemorySize() -resourceCalculatorPlugin.getAvailablePhysicalMemorySize();//系统中的虚拟内存的总大小 - 系统中可用虚拟内存总大小long vmem =resourceCalculatorPlugin.getVirtualMemorySize()- resourceCalculatorPlugin.getAvailableVirtualMemorySize();//获取已用的虚拟核数float vcores = resourceCalculatorPlugin.getNumVCoresUsed();//ResourceUtilization对集群中一组计算机资源的利用率进行建模nodeUtilization =ResourceUtilization.newInstance((int) (pmem >> 20), // B -> MB(int) (vmem >> 20), // B -> MBvcores); // 已使用的虚拟核心// 将节点利用率指标发布到 NodeManager 指标系统NodeManagerMetrics nmMetrics = nmContext.getNodeManagerMetrics();if (nmMetrics != null) {nmMetrics.setNodeUsedMemGB(nodeUtilization.getPhysicalMemory());nmMetrics.setNodeUsedVMemGB(nodeUtilization.getVirtualMemory());nmMetrics.setNodeCpuUtilization(nodeUtilization.getCPU());}try {Thread.sleep(monitoringInterval);} catch (InterruptedException e) {LOG.warn(NodeResourceMonitorImpl.class.getName()+ " is interrupted. Exiting.");break;}}}}

7、ContainerManagerImpl

    containerManager =createContainerManager(context, exec, del, nodeStatusUpdater,this.aclsManager, dirsHandler);addService(containerManager);((NMContext) context).setContainerManager(containerManager);protected ContainerManagerImpl createContainerManager(Context context,ContainerExecutor exec, DeletionService del,NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,LocalDirsHandlerService dirsHandler) {return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,metrics, dirsHandler);}

ContainerManagerImpl

public class ContainerManagerImpl extends CompositeService implementsContainerManager {public ContainerManagerImpl(Context context, ContainerExecutor exec,DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {super(ContainerManagerImpl.class.getName());this.context = context;this.dirsHandler = dirsHandler;// ContainerManager级调度程序//在单独的线程中调度。目前只有一个线程能做到这一点。每个事件类型类可能有多个通道,并且可以使用线程池来调度事件。dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher");this.deletionService = deletionContext;this.metrics = metrics;//创建ResourceLocalizationService服务 本地资源跟踪服务rsrcLocalizationSrvc =createResourceLocalizationService(exec, deletionContext, context,metrics);addService(rsrcLocalizationSrvc);//要使用的容器启动器实现//获取 yarn.nodemanager.containers-launcher.class 的值 默认为 ContainersLauncher.class//只有在{@link ResourceLocalizationService}启动后才能启动此服务,因为它取决于在本地文件系统上创建系统目录//会处理这些事件//    LAUNCH_CONTAINER//    RELAUNCH_CONTAINER//    RECOVER_CONTAINER//    RECOVER_PAUSED_CONTAINER//    CLEANUP_CONTAINER//    CLEANUP_CONTAINER_FOR_REINIT//    SIGNAL_CONTAINER//    PAUSE_CONTAINER//    RESUME_CONTAINERcontainersLauncher = createContainersLauncher(context, exec);addService(containersLauncher);this.nodeStatusUpdater = nodeStatusUpdater;this.containerScheduler = createContainerScheduler(context);addService(containerScheduler);AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =new AuxiliaryLocalPathHandlerImpl(dirsHandler);//配置服务启动auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler,this.context, this.deletionService);auxiliaryServices.registerServiceListener(this);addService(auxiliaryServices);//如果启用了时间轴服务v.2并且启用了系统发布服务器,则初始化度量发布服务器Configuration conf = context.getConf();if (YarnConfiguration.timelineServiceV2Enabled(conf)) {if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {LOG.info("YARN system metrics publishing service is enabled");nmMetricsPublisher = createNMTimelinePublisher(context);context.setNMTimelinePublisher(nmMetricsPublisher);}this.timelineServiceV2Enabled = true;}this.containersMonitor = createContainersMonitor(exec);addService(this.containersMonitor);dispatcher.register(ContainerEventType.class,new ContainerEventDispatcher());dispatcher.register(ApplicationEventType.class,createApplicationEventDispatcher());dispatcher.register(LocalizationEventType.class,new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc,nmMetricsPublisher));dispatcher.register(AuxServicesEventType.class, auxiliaryServices);dispatcher.register(ContainersMonitorEventType.class, containersMonitor);dispatcher.register(ContainersLauncherEventType.class, containersLauncher);dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);addService(dispatcher);ReentrantReadWriteLock lock = new ReentrantReadWriteLock();this.readLock = lock.readLock();this.writeLock = lock.writeLock();}public void serviceInit(Configuration conf) throws Exception {logHandler =createLogHandler(conf, this.context, this.deletionService);addIfService(logHandler);dispatcher.register(LogHandlerEventType.class, logHandler);//添加共享缓存上载服务(如果禁用共享缓存,它将不起任何作用)SharedCacheUploadService sharedCacheUploader =createSharedCacheUploaderService();addService(sharedCacheUploader);dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);createAMRMProxyService(conf);waitForContainersOnShutdownMillis =conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +SHUTDOWN_CLEANUP_SLOP_MS;super.serviceInit(conf);recover();}protected void serviceStart() throws Exception {//在删除上下文中登记用户目录Configuration conf = getConfig();final InetSocketAddress initialAddress = conf.getSocketAddr(YarnConfiguration.NM_BIND_HOST,YarnConfiguration.NM_ADDRESS,YarnConfiguration.DEFAULT_NM_ADDRESS,YarnConfiguration.DEFAULT_NM_PORT);boolean usingEphemeralPort = (initialAddress.getPort() == 0);if (context.getNMStateStore().canRecover() && usingEphemeralPort) {throw new IllegalArgumentException("Cannot support recovery with an "+ "ephemeral server port. Check the setting of "+ YarnConfiguration.NM_ADDRESS);}// 如果正在恢复,则延迟打开RPC服务,直到资源和容器的恢复完成,否则在恢复过程中来自客户端的请求可能会干扰恢复过程。final boolean delayedRpcServerStart =context.getNMStateStore().canRecover();Configuration serverConf = new Configuration(conf);//始终强制它是基于令牌的serverConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,SaslRpcServer.AuthMethod.TOKEN.toString());YarnRPC rpc = YarnRPC.create(conf);server =rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, serverConf, this.context.getNMTokenSecretManager(),conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));// 启用服务授权?if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {refreshServiceAcls(conf, NMPolicyProvider.getInstance());}String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);String hostOverride = null;if (bindHost != null && !bindHost.isEmpty()&& nmAddress != null && !nmAddress.isEmpty()) {//一个带有地址的绑定主机案例,为了支持用指定的地址重写查询主机名时发现的第一个主机名,请将指定的地址与服务器侦听的实际端口组合hostOverride = nmAddress.split(":")[0];}//启动 node IDInetSocketAddress connectAddress;if (delayedRpcServerStart) {connectAddress = NetUtils.getConnectAddress(initialAddress);} else {server.start();connectAddress = NetUtils.getConnectAddress(server);}NodeId nodeId = buildNodeId(connectAddress, hostOverride);((NodeManager.NMContext)context).setNodeId(nodeId);this.context.getNMTokenSecretManager().setNodeId(nodeId);this.context.getContainerTokenSecretManager().setNodeId(nodeId);//开始剩下的服务super.serviceStart();if (delayedRpcServerStart) {waitForRecoveredContainers();server.start();//检查节点ID是否与之前公布的一样connectAddress = NetUtils.getConnectAddress(server);NodeId serverNode = buildNodeId(connectAddress, hostOverride);if (!serverNode.equals(nodeId)) {throw new IOException("Node mismatch after server started, expected '"+ nodeId + "' but found '" + serverNode + "'");}}LOG.info("ContainerManager started at " + connectAddress);LOG.info("ContainerManager bound to " + initialAddress);}
}

8、NMLogAggregationStatusTracker

NMLogAggregationStatusTracker用于缓存已完成应用程序的日志聚合状态。它还将定期删除旧的缓存日志聚合状态。

    this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(context);addService(nmLogAggregationStatusTracker);((NMContext)context).setNMLogAggregationStatusTracker(this.nmLogAggregationStatusTracker);private NMLogAggregationStatusTracker createNMLogAggregationStatusTracker(Context ctxt) {return new NMLogAggregationStatusTracker(ctxt);}
public class NMLogAggregationStatusTracker extends CompositeService {public NMLogAggregationStatusTracker(Context context) {super(NMLogAggregationStatusTracker.class.getName());this.nmContext = context;Configuration conf = context.getConf();//是否启用日志聚合//获取 yarn.log-aggregation-enable 的值 默认false//这也说明 disabled 默认值是 trueif (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {disabled = true;}this.recoveryStatuses = new ConcurrentHashMap<>();//ReadWriteLock的一种实现,支持与ReentrantLock类似的语义ReentrantReadWriteLock lock = new ReentrantReadWriteLock();this.readLocker = lock.readLock();this.writeLocker = lock.writeLock();this.timer = new Timer();//获取 yarn.log-aggregation-status.time-out.ms 的值 默认值 10 * 60 * 1000//ResourceManager等待NodeManager报告其日志聚合状态的时间。如果从NodeManager报告日志聚合状态的等待时间超过配置的值,RM将报告此NodeManager的日志聚合状态为超时。//NodeManager中也将使用此配置来决定是否以及何时删除缓存的日志聚合状态。long configuredRollingInterval = conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);if (configuredRollingInterval <= 0) {this.rollingInterval = YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;LOG.warn("The configured log-aggregation-status.time-out.ms is "+ configuredRollingInterval + " which should be larger than 0. "+ "Using the default value:" + this.rollingInterval + " instead.");} else {this.rollingInterval = configuredRollingInterval;}LOG.info("the rolling interval seconds for the NodeManager Cached Log "+ "aggregation status is " + (rollingInterval/1000));}@Overrideprotected void serviceStart() throws Exception {if (disabled) {LOG.warn("Log Aggregation is disabled."+ "So is the LogAggregationStatusTracker.");} else {//开启日志聚合才走这一步//LogAggregationStatusRoller 是内部类this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(),rollingInterval, rollingInterval);}}private class LogAggregationStatusRoller extends TimerTask {@Overridepublic void run() {rollLogAggregationStatus();}}private void rollLogAggregationStatus() {//当我们调用rollLogAggregationStatus,基本上获取所有缓存的日志聚合状态,并删除超时期外的日志聚合状况时,我们应该阻止rollLogAg胶gationStatus调用以及pullCachedLogAggregateReports调用。所以,这里使用了writeLocker。this.writeLocker.lock();try {long currentTimeStamp = System.currentTimeMillis();LOG.info("Rolling over the cached log aggregation status.");Iterator<Entry<ApplicationId, AppLogAggregationStatusForRMRecovery>> it= recoveryStatuses.entrySet().iterator();while (it.hasNext()) {Entry<ApplicationId, AppLogAggregationStatusForRMRecovery> tracker =it.next();//应用已经完成了if (nmContext.getApplications().get(tracker.getKey()) == null) {if (currentTimeStamp - tracker.getValue().getLastModifiedTime()> rollingInterval) {it.remove();}}}} finally {this.writeLocker.unlock();}}
}

9、WebServer

    WebServer webServer = createWebServer(context, containerManager.getContainersMonitor(), this.aclsManager, dirsHandler);addService(webServer);((NMContext) context).setWebServer(webServer);protected WebServer createWebServer(Context nmContext,ResourceView resourceView, ApplicationACLsManager aclsManager,LocalDirsHandlerService dirsHandler) {return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);}
public class WebServer extends AbstractService {public WebServer(Context nmContext, ResourceView resourceView,ApplicationACLsManager aclsManager,LocalDirsHandlerService dirsHandler) {super(WebServer.class.getName());this.nmContext = nmContext;this.nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);}protected void serviceStart() throws Exception {Configuration conf = getConfig();//获取用于绑定的URL,其中可以指定绑定主机名以覆盖webAppURLWithoutScheme中的主机名。将使用webAppURLWithoutScheme中指定的端口。//NM_BIND_HOST NM的实际绑定地址//yarn.nodemanager.bind-host 的值//WebAppUtils.getNMWebAppURLWithoutScheme(conf)) 是获取 NM Webapp 地址//    获取 yarn.nodemanager.webapp.address 的值 默认值 0.0.0.0:8042String bindAddress = WebAppUtils.getWebAppBindURL(conf,YarnConfiguration.NM_BIND_HOST,WebAppUtils.getNMWebAppURLWithoutScheme(conf));//获取 CORS过滤器状态(启用/禁用)// yarn.nodemanager.webapp.cross-origin.enabled 默认 false//boolean enableCors = conf.getBoolean(YarnConfiguration.NM_WEBAPP_ENABLE_CORS_FILTER,YarnConfiguration.DEFAULT_NM_WEBAPP_ENABLE_CORS_FILTER);if (enableCors) {getConfig().setBoolean(HttpCrossOriginFilterInitializer.PREFIX+ HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true);}//总是加载伪身份验证过滤器来解析URL中的“user.name”,以识别HTTP请求的用户。boolean hasHadoopAuthFilterInitializer = false;String filterInitializerConfKey = "hadoop.http.filter.initializers";Class<?>[] initializersClasses =conf.getClasses(filterInitializerConfKey);List<String> targets = new ArrayList<String>();if (initializersClasses != null) {for (Class<?> initializer : initializersClasses) {if (initializer.getName().equals(AuthenticationFilterInitializer.class.getName())) {hasHadoopAuthFilterInitializer = true;break;}targets.add(initializer.getName());}}if (!hasHadoopAuthFilterInitializer) {targets.add(AuthenticationFilterInitializer.class.getName());conf.set(filterInitializerConfKey, StringUtils.join(",", targets));}LOG.info("Instantiating NMWebApp at " + bindAddress);try {this.webApp =WebApps.$for("node", Context.class, this.nmContext, "ws").at(bindAddress).with(conf).withHttpSpnegoPrincipalKey(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY).withHttpSpnegoKeytabKey(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY).withCSRFProtection(YarnConfiguration.NM_CSRF_PREFIX).withXFSProtection(YarnConfiguration.NM_XFS_PREFIX).start(this.nmWebApp);this.port = this.webApp.httpServer().getConnectorAddress(0).getPort();} catch (Exception e) {String msg = "NMWebapps failed to start.";LOG.error(msg, e);throw new YarnRuntimeException(msg, e);}super.serviceStart();}public static class NMWebApp extends WebApp implements YarnWebParams {private final ResourceView resourceView;private final ApplicationACLsManager aclsManager;private final LocalDirsHandlerService dirsHandler;public NMWebApp(ResourceView resourceView,ApplicationACLsManager aclsManager,LocalDirsHandlerService dirsHandler) {this.resourceView = resourceView;this.aclsManager = aclsManager;this.dirsHandler = dirsHandler;}@Override//设置资源绑定合访问路径public void setup() {bind(NMWebServices.class);bind(GenericExceptionHandler.class);bind(JAXBContextResolver.class);bind(ResourceView.class).toInstance(this.resourceView);bind(ApplicationACLsManager.class).toInstance(this.aclsManager);bind(LocalDirsHandlerService.class).toInstance(dirsHandler);route("/", NMController.class, "info");route("/node", NMController.class, "node");route("/allApplications", NMController.class, "allApplications");route("/allContainers", NMController.class, "allContainers");route(pajoin("/application", APPLICATION_ID), NMController.class,"application");route(pajoin("/container", CONTAINER_ID), NMController.class,"container");route(pajoin("/containerlogs", CONTAINER_ID, APP_OWNER, CONTAINER_LOG_TYPE),NMController.class, "logs");route("/errors-and-warnings", NMController.class, "errorsAndWarnings");}@Overrideprotected Class<? extends GuiceContainer> getWebAppFilterClass() {return NMWebAppFilter.class;}}}

10、AsyncDispatcher

Dispatches{@link Event}在一个单独的线程中。目前只有一个线程能做到这一点。每个事件类型类可能有多个通道,并且可以使用线程池来调度事件。

public class AsyncDispatcher extends AbstractService implements Dispatcher {protected void serviceInit(Configuration conf) throws Exception{super.serviceInit(conf);//获取 yarn.dispatcher.print-events-info.threshold  的值 默认 5000// 用于触发在RM的主事件调度程序中记录事件类型和计数的阈值。默认值为5000,这意味着RM将在每次队列大小累计达到5000时打印事件信息。这些信息可以用来揭示RM主要处理哪类事件,这有助于缩小某些性能问题的范围。this.detailsInterval = getConfig().getInt(YarnConfiguration.YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,YarnConfiguration.DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);}@Overrideprotected void serviceStart() throws Exception {//启动所有组件super.serviceStart();eventHandlingThread = new Thread(createThread());eventHandlingThread.setName(dispatcherThreadName);eventHandlingThread.start();}Runnable createThread() {return new Runnable() {@Overridepublic void run() {while (!stopped && !Thread.currentThread().isInterrupted()) {drained = eventQueue.isEmpty();//blockNewEvents仅在调度程序要停止时设置,添加此检查是为了避免在循环的正常运行中每次获取锁和调用notify的开销。if (blockNewEvents) {synchronized (waitForDrained) {if (drained) {waitForDrained.notify();}}}Event event;try {event = eventQueue.take();} catch(InterruptedException ie) {if (!stopped) {LOG.warn("AsyncDispatcher thread interrupted", ie);}return;}if (event != null) {dispatch(event);if (printTrigger) {//记录最新的调度事件类型//可能导致排队的事件太多LOG.info("Latest dispatch event type: " + event.getType());printTrigger = false;}}}}};}
}

11、JvmPauseMonitor

    pauseMonitor = new JvmPauseMonitor();addService(pauseMonitor);metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

该类(JvmPauseMonitor)设置一个简单的线程,该线程在睡眠短时间间隔的循环中运行。如果睡眠时间明显长于其目标时间,则表示JVM或主机已暂停处理,这可能会导致其他问题。如果检测到这样的暂停,线程将记录一条消息。

public class JvmPauseMonitor extends AbstractService {protected void serviceInit(Configuration conf) throws Exception {//如果检测到暂停时间超过此阈值,则记录警告//获取 jvm.pause.warn-threshold.ms 的值 默认 10000this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);//如果检测到暂停时间超过此阈值,则记录信息//获取 jvm.pause.info-threshold.ms 的值 默认 1000this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);super.serviceInit(conf);}@Overrideprotected void serviceStart() throws Exception {//构造守护进程线程monitorThread = new Daemon(new Monitor());monitorThread.start();super.serviceStart();}
}

12、NodeStatusUpdater

    //StatusUpdater应该最后添加,这样它才能最后启动,这样我们就可以在向RM注册之前确保一切正常。addService(nodeStatusUpdater);((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);nmStore.setNodeStatusUpdater(nodeStatusUpdater);

五、总结

1、初始化本地文件系统目录,并将其中的内容封装成 LevelDB 的形式,用于失败恢复

2、创建线程池(默认4个),提供周期任务和延迟任务相关操作

3、利用脚本定期检查节点健康状态,并进行上报

4、检查是否开启启用分布式节点标签配置,默认不开启

5、检查是否开启启用分布式节点属性配置,默认不开启

6、创建节点资源监视器,并启动一个线程定期跟踪节点的资源利用情况

7、创建本节点容器管理调度程序,用来处理容器的申请、创建等事件

8、检查日志聚合状态是否开启,并定期删除旧的缓存日志

9、启动WebServer 服务,提供应用查询、容器查询、容器日志查询等功能

10、创建节点守护进程

11、创建节点状态更新服务

这篇关于Hadoop-Yarn-NodeManager都做了什么的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/720952

相关文章

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

【Hadoop|MapReduce篇】MapReduce概述

1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. MapReduce优缺点 2.1 优点 MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式

【hadoop Sqoop】Sqoop从mysql导数据到hdfs

1.下载sqoop安装包 wget https://mirrors.tuna.tsinghua.edu.cn/apache/sqoop/1.4.6/sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 2.解压安装包 tar -xzvf /sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 3.配置hadoop mv s

【Hadoop|HDFS篇】NameNode和SecondaryNameNode

1. NN和2NN的工作机制 思考:NameNode中的元数据是存储在哪里的? 首先,我们做个假设,如果存储在NameNode节点的磁盘中,因为经常需要进行随机访 问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在 内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的 Fslmage。 这样又会带来新的问题,当在内存中的元数据更新时,如

【Hadoop|HDFS篇】DataNode

1. DataNode的工作机制 1)一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。 2)DataNode启动后向NameNode注册,通过后,周期性(6h)的向NameNode上报所有块信息。 DN向NN汇报当前解读信息的时间间隔,默认6小时。 DN扫描自己节点块信息列表的时间,默认为

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop

Hadoop Namenode元数据持久化机制与SecondaryNamenode的作用详解

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 我们都知道namenode是用来存储元数据的,他并不是用来存储真正的数据。 那么他的元数据怎么进行持久化呢! FsImage 文件系统的镜像文件叫fsImage,它包括了文件和块信息的映射,还有文件系统的属性信息。 datan