Hadoop-Yarn-NodeManager是如何监控容器的

2024-02-28 10:44

本文主要是介绍Hadoop-Yarn-NodeManager是如何监控容器的,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、上下文

在我的博客<Hadoop-Yarn-NodeManager是如何启动容器的>中的ContainerLaunch  prepareForLaunch()会触发ContainerEventType.CONTAINER_LAUNCHED事件,ContainerImpl会处理该事件,监控该容器的资源使用以及处理后续操作,下面让我们把源码捋起来吧。

三、开始捋源码

1、ContainerImpl

public class ContainerImpl implements Container {private static StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>stateMachineFactory =new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType,             ContainerEvent>(ContainerState.NEW).//......省略其他事件处理......addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())//......省略其他事件处理.......installTopology();static class LaunchTransition extends ContainerTransition {@SuppressWarnings("unchecked")@Overridepublic void transition(ContainerImpl container, ContainerEvent event) {//发送容器监控事件,去监控容器的使用container.sendContainerMonitorStartEvent();container.metrics.runningContainer();container.wasLaunched  = true;if (container.isReInitializing()) {NMAuditLogger.logSuccess(container.user,AuditConstants.FINISH_CONTAINER_REINIT, "ContainerImpl",container.containerId.getApplicationAttemptId().getApplicationId(),container.containerId);}container.setIsReInitializing(false);// Check if this launch was due to a re-initialization.// If autocommit == true, then wipe the re-init context. This ensures// that any subsequent failures do not trigger a rollback.if (container.reInitContext != null&& !container.reInitContext.canRollback()) {container.reInitContext = null;}if (container.recoveredAsKilled) {LOG.info("Killing " + container.containerId+ " due to recovered as killed");container.addDiagnostics("Container recovered as killed.\n");container.dispatcher.getEventHandler().handle(new ContainersLauncherEvent(container,ContainersLauncherEventType.CLEANUP_CONTAINER));}}}private void sendContainerMonitorStartEvent() {long launchDuration = clock.getTime() - containerLaunchStartTime;metrics.addContainerLaunchDuration(launchDuration);long pmemBytes = getResource().getMemorySize() * 1024 * 1024L;float pmemRatio = daemonConf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);long vmemBytes = (long) (pmemRatio * pmemBytes);int cpuVcores = getResource().getVirtualCores();long localizationDuration = containerLaunchStartTime -containerLocalizationStartTime;//这里会触发 ContainersMonitorEventType.START_MONITORING_CONTAINER//该事件由ContainersMonitorImpl处理dispatcher.getEventHandler().handle(new ContainerStartMonitoringEvent(containerId,vmemBytes, pmemBytes, cpuVcores, launchDuration,localizationDuration));}}

2、ContainersMonitorImpl

监视收集资源使用情况的容器,并在容器超出限制时抢占容器

public class ContainersMonitorImpl extends AbstractService implementsContainersMonitor {private final static Logger LOG =LoggerFactory.getLogger(ContainersMonitorImpl.class);private final static Logger AUDITLOG =LoggerFactory.getLogger(ContainersMonitorImpl.class.getName()+".audit");private long monitoringInterval;private MonitoringThread monitoringThread;private int logCheckInterval;private LogMonitorThread logMonitorThread;private long logDirSizeLimit;private long logTotalSizeLimit;private CGroupElasticMemoryController oomListenerThread;private boolean containerMetricsEnabled;private long containerMetricsPeriodMs;private long containerMetricsUnregisterDelayMs;@VisibleForTestingfinal Map<ContainerId, ProcessTreeInfo> trackingContainers =new ConcurrentHashMap<>();private final ContainerExecutor containerExecutor;private final Dispatcher eventDispatcher;private final Context context;private ResourceCalculatorPlugin resourceCalculatorPlugin;private Configuration conf;private static float vmemRatio;//用于获取进程资源使用情况的接口类//注意:此类不应由外部用户使用,而只能由外部开发人员使用,以扩展和包括他们自己的流程树实现,尤其是对于Linux和Windows以外的平台。private Class<? extends ResourceCalculatorProcessTree> processTreeClass;private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;private long maxPmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;private boolean pmemCheckEnabled;private boolean vmemCheckEnabled;private boolean elasticMemoryEnforcement;private boolean strictMemoryEnforcement;private boolean containersMonitorEnabled;private boolean logMonitorEnabled;private long maxVCoresAllottedForContainers;private static final long UNKNOWN_MEMORY_LIMIT = -1L;private int nodeCpuPercentageForYARN;/*** 容器度量的类型*/@Privatepublic enum ContainerMetric {CPU, MEMORY}//ResourceUtilization对集群中一组计算机资源的利用率进行建模private ResourceUtilization containersUtilization;private volatile boolean stopped = false;public ContainersMonitorImpl(ContainerExecutor exec,AsyncDispatcher dispatcher, Context context) {super("containers-monitor");this.containerExecutor = exec;this.eventDispatcher = dispatcher;this.context = context;this.monitoringThread = new MonitoringThread();this.logMonitorThread = new LogMonitorThread();//ResourceUtilization.newInstance(物理内存, 虚拟内存, cpu利用率)this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);}@Overrideprotected void serviceInit(Configuration myConf) throws Exception {this.conf = myConf;//监视容器的频率//获取 yarn.nodemanager.container-monitor.interval-ms 的值 //如果未设置,则将使用yarn.nodemanager.resource-monitor.interval-ms的值。如果为0或为负数,则禁用容器监视。//监视节点和容器的频率//获取 yarn.nodemanager.resource-monitor.interval-ms 的值 默认值 3000ms 即 3s 如果为0或为负数,则禁用监视this.monitoringInterval =this.conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,this.conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS));//检查容器日志目录使用情况的频率(以毫秒为单位)//获取 yarn.nodemanager.container-log-monitor.interval-ms 的值 默认值 60000ms 即 1minthis.logCheckInterval =conf.getInt(YarnConfiguration.NM_CONTAINER_LOG_MON_INTERVAL_MS,YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_MON_INTERVAL_MS);//单个容器日志目录的磁盘空间限制(以字节为单位)1GB = 1024MB = 1024*1024KB = 1024*1024*1024B B就是字节//获取 yarn.nodemanager.container-log-monitor.dir-size-limit-bytes 的值 默认值 1000000000L 约等于 1G this.logDirSizeLimit =conf.getLong(YarnConfiguration.NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES,YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES);//容器所有日志的磁盘空间限制(以字节为单位)//获取 yarn.nodemanager.container-log-monitor.total-size-limit-bytes 的值 默认值 10000000000L 即 10Gthis.logTotalSizeLimit =conf.getLong(YarnConfiguration.NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES,YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES);//用于计算系统上的资源信息的插件,如果未配置插件,此方法将尝试返回可用于此系统的内存计算器插件。//先获取 yarn.nodemanager.container-monitor.resource-calculator.class (计算当前资源利用率的类) 的值 默认空//再获取 yarn.nodemanager.resource-calculator.class (计算当前资源利用率的类) 的值 默认空//如果都为空会判断操作系统,LINUX 返回 SysInfoLinux WINDOWS 返回 SysInfoWindowsthis.resourceCalculatorPlugin =ResourceCalculatorPlugin.getContainersMonitorPlugin(this.conf);LOG.info(" Using ResourceCalculatorPlugin : "+ this.resourceCalculatorPlugin);//获取 yarn.nodemanager.container-monitor.process-tree.class (用于计算进程树资源利用率) 的值 默认为空processTreeClass = this.conf.getClass(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,ResourceCalculatorProcessTree.class);LOG.info(" Using ResourceCalculatorProcessTree : "+ this.processTreeClass);//启用容器度量的标志//获取 yarn.nodemanager.container-metrics.enable 的值 默认 true this.containerMetricsEnabled =this.conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);//容器度量刷新周期(毫秒)。设置为-1表示完成时刷新//获取 yarn.nodemanager.container-metrics.period-ms 的值 默认为-1this.containerMetricsPeriodMs =this.conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);//完成后注销容器度量的延迟时间ms//获取 yarn.nodemanager.container-metrics.unregister-delay-ms 的值 默认 10000ms 即 10sthis.containerMetricsUnregisterDelayMs = this.conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);//NodeManagerHardwareUtils:用于确定与硬件相关的特性,例如节点上的处理器数量和内存量//函数返回应该为YARN容器留出多少内存。如果在配置文件中指定了一个数字,则会返回该数字。如果未指定任何内容,则为-1。//如果操作系统是“未知”操作系统(我们没有为其实现ResourceCalculatorPlugin),则返回默认的NodeManager物理内存。//如果操作系统实现了ResourceCalculatorPlugin,则计算为0.8*(RAM-2*JVM内存),即在考虑了DataNode和NodeManager使用的内存后,使用80%的内存。//如果数字小于1GB,请记录一条警告消息//获取 yarn.nodemanager.resource.detect-hardware-capabilities (启用节点功能的自动检测,如内存和CPU) 的值 默认 false//如果为 false ,即默认会 获取配置文件中的数字 yarn.nodemanager.resource.memory-mb (可分配给容器的内存量(MB)) //这里 源码 和 官方文档 有出入 ,官方文档默认值为-1 源码默认值为 8 * 1024 MB 即 8G ,如果设置为 -1 源码还是会更改为 8G ,可以设置其他值//返回的值是 8*1024 这里又 * 1024 * 1024L 即为 转换为 8G 对应的字节 B long configuredPMemForContainers =NodeManagerHardwareUtils.getContainerMemoryMB(this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L;//函数返回系统上可用于YARN容器的vcore数。如果在配置文件中指定了一个数字,则会返回该数字。如果未指定任何内容,则为-1。//如果操作系统是“未知”操作系统(我们没有为其实现ResourceCalculatorPlugin),则返回默认的NodeManager内核。//2.如果配置变量yarn.nodemanager.cpu.use_logical_processers设置为true,则返回逻辑处理器计数(将超线程计数为核心),否则返回物理核心计数。//获取 yarn.nodemanager.resource.cpu-vcores (可分配给容器的虚拟CPU内核数) 的值 //可以分配给容器的vcore数。这是RM调度程序在为容器分配资源时使用的。这并不用于限制YARN容器使用的CPU数量。如果它设置为-1,//并且yarn.nodemanager.resource.detect-hardware-cability为true,则在Windows和Linux的情况下,它将自动从硬件中确定。//在其他情况下,默认情况下vcore的数量为8。long configuredVCoresForContainers =NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin,this.conf);//无论是否启用检查,都要设置这些。UI中必需// / 物理内存配置 ////maxPmemAllottedForContainers = 8G //maxVCoresAllottedForContainers = 8个虚拟核//这样看来 默认的容器能申请到的最多的资源为 8vc 8Gthis.maxPmemAllottedForContainers = configuredPMemForContainers;this.maxVCoresAllottedForContainers = configuredVCoresForContainers;// / 虚拟内存配置 ////获取 yarn.nodemanager.vmem-pmem-ratio 的值 默认 2.1//为容器设置内存限制时,虚拟内存与物理内存之间的比率。容器分配是以物理内存的形式表示的,虚拟内存的使用率可以超过此分配比例。vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);//校验 为容器设置的内存限制比率,必须大于 0.99Preconditions.checkArgument(vmemRatio > 0.99f,YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");//容器可分配的最大虚拟默认为 : 2.1 * 8 = 16.8 Gthis.maxVmemAllottedForContainers =(long) (vmemRatio * configuredPMemForContainers);//是否将对容器强制执行物理内存限制//获取 yarn.nodemanager.pmem-check-enabled 的值 默认 true pmemCheckEnabled = this.conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);//是否将对容器强制执行虚拟内存限制//获取 yarn.nodemanager.vmem-check-enabled 的值 默认 true vmemCheckEnabled = this.conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);//启用弹性内存控制。这是Linux独有的功能。启用后,如果所有容器都超过了限制,则节点管理器会添加一个侦听器来接收事件。//限制由yarn.nodemanager.resource.memory-mb指定。如果未设置此项,则会根据功能设置限制。//有关详细信息,请参阅yarn.nodemanager.resource.detect-hardware-cability。该限制适用于物理或虚拟(rss+交换)内存,//具体取决于是否设置了yarn.nodemanager.pmem-check-enabled或yarn.node manager.vmem-check-enabled。//获取 yarn.nodemanager.elastic-memory-control.enabled 的值 默认 false elasticMemoryEnforcement = this.conf.getBoolean(YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED,YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED);//是否启用YARN CGroups严格内存强制,顾名思义就是资源一旦超过设置的限制就会里面kill掉//获取 yarn.nodemanager.resource.memory.enforced 的值 默认 truestrictMemoryEnforcement = conf.getBoolean(YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED,YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED);LOG.info("Physical memory check enabled: " + pmemCheckEnabled);LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);LOG.info("Elastic memory control enabled: " + elasticMemoryEnforcement);LOG.info("Strict memory control enabled: " + strictMemoryEnforcement);//默认不开启弹性内存控制,这段逻辑不走if (elasticMemoryEnforcement) {if (!CGroupElasticMemoryController.isAvailable()) {// Test for availability outside the constructor// to be able to write non-Linux unit tests for// CGroupElasticMemoryControllerthrow new YarnException("CGroup Elastic Memory controller enabled but " +"it is not available. Exiting.");} else {this.oomListenerThread = new CGroupElasticMemoryController(conf,context,ResourceHandlerModule.getCGroupsHandler(),pmemCheckEnabled,vmemCheckEnabled,pmemCheckEnabled ?maxPmemAllottedForContainers : maxVmemAllottedForContainers);}}//isContainerMonitorEnabled() 默认为 true //monitoringInterval 默认 3000ms 即 3s//因此 containersMonitorEnabled 默认为 true 容器监视默认是开启的containersMonitorEnabled =isContainerMonitorEnabled() && monitoringInterval > 0;LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled);//用于启用容器日志监视器的标志,该监视器强制执行容器日志目录大小限制//获取 yarn.nodemanager.container-log-monitor.enable 的值 默认 falselogMonitorEnabled =conf.getBoolean(YarnConfiguration.NM_CONTAINER_LOG_MONITOR_ENABLED,YarnConfiguration.DEFAULT_NM_CONTAINER_LOG_MONITOR_ENABLED);LOG.info("Container Log Monitor Enabled: "+ logMonitorEnabled);//获取为YARN容器配置的物理CPU的百分比。返回值是 0 ~ 100//可以分配给容器的CPU百分比。此设置允许用户限制YARN容器使用的CPU数量。目前仅在使用cgroups的Linux上运行。默认情况是使用100%的CPU。//获取 yarn.nodemanager.resource.percentage-physical-cpu-limit 的值 默认值 100//nodeCpuPercentageForYARN 默认为 100nodeCpuPercentageForYARN =NodeManagerHardwareUtils.getNodeCpuPercentage(this.conf);//默认为 true 对容器强制执行物理内存限制if (pmemCheckEnabled) {//如果无法确定实际设备,则记录下long totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;//默认操作系统是LINUX resourceCalculatorPlugin = SysInfoLinuxif (this.resourceCalculatorPlugin != null) {//SysInfoLinux 只读取/proc/meminfo、解析和计算一次内存信息。给  ramSize、hardwareCorruptSize、hugePagesTotal、hugePageSize赋值//totalPhysicalMemoryOnNM =  (ramSize - hardwareCorruptSize - (hugePagesTotal * hugePageSize)) * 1024//totalPhysicalMemoryOnNM =  (ram磁盘空间 - ram已损坏空间 - (保留的标准大页 * 每个标准大页的大小)) * 1024//可以参考我的这篇 <Hadoop-Yarn-NodeManager如何计算Linux系统上的资源信息> 博客中了解//ramSize : ram 磁盘空间//hardwareCorruptSize : RAM已损坏且不可用大小//hugePagesTotal : 保留的标准大页//hugePageSize : 每个标准大页的大小totalPhysicalMemoryOnNM = this.resourceCalculatorPlugin.getPhysicalMemorySize();if (totalPhysicalMemoryOnNM <= 0) {LOG.warn("NodeManager's totalPmem could not be calculated. "+ "Setting it to " + UNKNOWN_MEMORY_LIMIT);totalPhysicalMemoryOnNM = UNKNOWN_MEMORY_LIMIT;}}//分配给容器的物理内存,占可用物理内存总量的80%以上可能会发生Thrashingif (totalPhysicalMemoryOnNM != UNKNOWN_MEMORY_LIMIT &&this.maxPmemAllottedForContainers > totalPhysicalMemoryOnNM * 0.80f) {LOG.warn("NodeManager configured with "+ TraditionalBinaryPrefix.long2String(maxPmemAllottedForContainers,"", 1)+ " physical memory allocated to containers, which is more than "+ "80% of the total physical memory available ("+ TraditionalBinaryPrefix.long2String(totalPhysicalMemoryOnNM, "",1) + "). Thrashing might happen.");}}super.serviceInit(this.conf);}//是否启用容器监视器//获取 yarn.nodemanager.container-monitor.enabled 的值 默认 trueprivate boolean isContainerMonitorEnabled() {return conf.getBoolean(YarnConfiguration.NM_CONTAINER_MONITOR_ENABLED,YarnConfiguration.DEFAULT_NM_CONTAINER_MONITOR_ENABLED);}/*** 获取最佳进程树计算器* @param pId container process id* @return process tree calculator*/private ResourceCalculatorProcessTreegetResourceCalculatorProcessTree(String pId) {return ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);}private boolean isResourceCalculatorAvailable() {if (resourceCalculatorPlugin == null) {LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this.getClass().getName() + " is disabled.");return false;}if (getResourceCalculatorProcessTree("0") == null) {LOG.info("ResourceCalculatorProcessTree is unavailable on this system. "+ this.getClass().getName() + " is disabled.");return false;}return true;}@Overrideprotected void serviceStart() throws Exception {//containersMonitorEnabled 默认为 true 容器监视默认是开启的if (containersMonitorEnabled) {//起一个线程对容器进行监视this.monitoringThread.start();}//默认不开启弹性内存控制if (oomListenerThread != null) {//如果开启基于cgroups的一种弹性内存控制,允许某些container可以使用超过设定值的资源,只要不超过整体的阈值。//因此会启动这个线程oomListenerThread监控是否超过了整体的阈值oomListenerThread.start();}//容器日志监视器默认关闭if (logMonitorEnabled) {this.logMonitorThread.start();}super.serviceStart();}private class MonitoringThread extends Thread {MonitoringThread() {super("Container Monitor");}@Overridepublic void run() {while (!stopped && !Thread.currentThread().isInterrupted()) {// 打印processTrees以进行调试if (LOG.isDebugEnabled()) {StringBuilder tmp = new StringBuilder("[ ");for (ProcessTreeInfo p : trackingContainers.values()) {tmp.append(p.getPID());tmp.append(" ");}LOG.debug("Current ProcessTree list : "+ tmp.substring(0, tmp.length()) + "]");}//用于计算容器的总资源利用率的临时结构ResourceUtilization trackedContainersUtilization  =ResourceUtilization.newInstance(0, 0, 0.0f);//现在对trackingContainers进行监视,检查内存使用情况并杀死任何溢出的容器//每个容器在启动时都会将本容器信息放入trackingContainers中,详细看onStartMonitoringContainer()long vmemUsageByAllContainers = 0;long pmemByAllContainers = 0;long cpuUsagePercentPerCoreByAllContainers = 0;for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers.entrySet()) {ContainerId containerId = entry.getKey();ProcessTreeInfo ptInfo = entry.getValue();try {//初始化未初始化的进程树initializeProcessTrees(entry);String pId = ptInfo.getPID();if (pId == null || !isResourceCalculatorAvailable()) {continue; //无法跟踪该 processTree}if (LOG.isDebugEnabled()) {LOG.debug("Constructing ProcessTree for : PID = " + pId+ " ContainerId = " + containerId);}ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();pTree.updateProcessTree();    // 更新 process-tree//获取进程树中所有进程使用的虚拟内存。long currentVmemUsage = pTree.getVirtualMemorySize();//获取进程树中所有进程使用的常驻集大小(rss)内存//rss 是 Resident Set Size 的缩写 表示驻留内存大小,是进程当前实际使用物理内存大小(包含共享库占用的内存)long currentPmemUsage = pTree.getRssMemorySize();if (currentVmemUsage < 0 || currentPmemUsage < 0) {// YARN-6862/YARN-5021 If the container just exited or for// another reason the physical/virtual memory is UNAVAILABLE (-1)// the values shouldn't be aggregated.LOG.info("Skipping monitoring container {} because "+ "memory usage is not available.", containerId);continue;}// if machine has 6 cores and 3 are used,// cpuUsagePercentPerCore should be 300%//基于样本之间的平均值,获取进程树中所有进程的CPU使用率,作为与顶部相似的总CPU周期的比率。因此,如果使用四分之二的核心,则返回200.0。//注意:在CPU使用率不可用的情况下,将返回UNAVAILABLE。不建议返回任何其他错误代码。float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();if (cpuUsagePercentPerCore < 0) {// CPU usage is not available likely because the container just// started. Let us skip this turn and consider this container// in the next iteration.LOG.info("Skipping monitoring container " + containerId+ " since CPU usage is not yet available.");continue;}//记录使用情况指标recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage,currentPmemUsage, trackedContainersUtilization);//检查资源限制,如果超出限制,请采取措施checkLimit(containerId, pId, pTree, ptInfo,currentVmemUsage, currentPmemUsage);//计算所有容器的总内存使用情况vmemUsageByAllContainers += currentVmemUsage;pmemByAllContainers += currentPmemUsage;//计算所有容器的总cpu使用量cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore;//向时间线服务报告使用情况指标reportResourceUsage(containerId, currentPmemUsage,cpuUsagePercentPerCore);} catch (Exception e) {// Log the exception and proceed to the next container.LOG.warn("Uncaught exception in ContainersMonitorImpl "+ "while monitoring resource of {}", containerId, e);}}if (LOG.isDebugEnabled()) {LOG.debug("Total Resource Usage stats in NM by all containers : "+ "Virtual Memory= " + vmemUsageByAllContainers+ ", Physical Memory= " + pmemByAllContainers+ ", Total CPU usage(% per core)= "+ cpuUsagePercentPerCoreByAllContainers);}//保存容器的聚合利用率setContainersUtilization(trackedContainersUtilization);//将容器利用率度量发布到节点管理器度量系统NodeManagerMetrics nmMetrics = context.getNodeManagerMetrics();if (nmMetrics != null) {nmMetrics.setContainerUsedMemGB(trackedContainersUtilization.getPhysicalMemory());nmMetrics.setContainerUsedVMemGB(trackedContainersUtilization.getVirtualMemory());nmMetrics.setContainerCpuUtilization(trackedContainersUtilization.getCPU());}try {//监视容器的频率 默认3sThread.sleep(monitoringInterval);} catch (InterruptedException e) {LOG.warn(ContainersMonitorImpl.class.getName()+ " is interrupted. Exiting.");break;}}}private void recordUsage(ContainerId containerId, String pId,ResourceCalculatorProcessTree pTree,ProcessTreeInfo ptInfo,long currentVmemUsage, long currentPmemUsage,ResourceUtilization trackedContainersUtilization) {// if machine has 6 cores and 3 are used,// cpuUsagePercentPerCore should be 300% and// cpuUsageTotalCoresPercentage should be 50%float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore /resourceCalculatorPlugin.getNumProcessors();//乘以1000以避免在转换为int时丢失数据//cpu 核数利用率 * 1000 * 8 / 100 //比如 0.5 * 1000 * 8 / 100 = 40int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000* maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);//进程树的虚拟内存限制(字节)long vmemLimit = ptInfo.getVmemLimit();//进程树的物理内存限制(字节)long pmemLimit = ptInfo.getPmemLimit();if (AUDITLOG.isDebugEnabled()) {int vcoreLimit = ptInfo.getCpuVcores();long cumulativeCpuTime = pTree.getCumulativeCpuTime();AUDITLOG.debug(String.format("Resource usage of ProcessTree %s for container-id %s:" +" %s %%CPU: %f %%CPU-cores: %f" +" vCores-used: %d of %d Cumulative-CPU-ms: %d",pId, containerId.toString(),formatUsageString(currentVmemUsage, vmemLimit,currentPmemUsage, pmemLimit),cpuUsagePercentPerCore,cpuUsageTotalCoresPercentage,milliVcoresUsed / 1000, vcoreLimit,cumulativeCpuTime));}//添加此容器的资源利用率trackedContainersUtilization.addTo((int) (currentPmemUsage >> 20),(int) (currentVmemUsage >> 20),milliVcoresUsed / 1000.0f);//将使用情况添加到容器指标if (containerMetricsEnabled) {ContainerMetrics.forContainer(containerId, containerMetricsPeriodMs,containerMetricsUnregisterDelayMs).recordMemoryUsage((int) (currentPmemUsage >> 20));ContainerMetrics.forContainer(containerId, containerMetricsPeriodMs,containerMetricsUnregisterDelayMs).recordCpuUsage((int)cpuUsagePercentPerCore, milliVcoresUsed);}}private void checkLimit(ContainerId containerId, String pId,ResourceCalculatorProcessTree pTree,ProcessTreeInfo ptInfo,long currentVmemUsage,long currentPmemUsage) {Optional<Boolean> isMemoryOverLimit = Optional.empty();String msg = "";int containerExitStatus = ContainerExitStatus.INVALID;//strictMemoryEnforcement 默认 true elasticMemoryEnforcement默认 false//因此不走这个逻辑 elasticMemoryEnforcement 开启 if (strictMemoryEnforcement && elasticMemoryEnforcement) {//弹性内存控制和严格内存控制都是通过cgroups实现的。如果容器超过其请求,它会被弹性内存控制机制冻结,所以我们在这里检查并杀死它。//否则,如果节点从未超过其限制,并且基于procfs的内存核算与基于cgroup的核算不同,则不会杀死容器。//默认为 CGroupsMemoryResourceHandlerImpl//处理程序类来处理内存控制器。YARN已经在Java中提供了一个物理内存监视器,但它不如CGroups。//此处理程序设置软内存和硬内存限制。软限制设置为硬限制的90%。MemoryResourceHandler handler =ResourceHandlerModule.getMemoryResourceHandler();if (handler != null) {//检查容器是否处于OOM状态isMemoryOverLimit = handler.isUnderOOM(containerId);containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;msg = containerId + " is under oom because it exceeded its" +" physical memory limit";}} else if (strictMemoryEnforcement || elasticMemoryEnforcement) {//如果启用了基于cgroup的内存控制isMemoryOverLimit = Optional.of(false);}if (!isMemoryOverLimit.isPresent()) {long vmemLimit = ptInfo.getVmemLimit();long pmemLimit = ptInfo.getPmemLimit();//当流程从1开始时,我们想看看是否有超过1次迭代的流程。long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1);//默认为 true 对容器强制执行虚拟内存限制if (isVmemCheckEnabled()&& isProcessTreeOverLimit(containerId.toString(),currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {//当前使用率(年龄=0)始终高于过期使用率。我们不在消息中显示老化的大小,而是根据当前使用情况进行增量long delta = currentVmemUsage - vmemLimit;// 容器(根进程)仍处于活动状态,内存溢出// 转储流程树,然后进行清理msg = formatErrorMessage("virtual",formatUsageString(currentVmemUsage, vmemLimit,currentPmemUsage, pmemLimit),pId, containerId, pTree, delta);isMemoryOverLimit = Optional.of(true);containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;//默认为 true 对容器强制执行物理内存限制//isProcessTreeOverLimit()://检查容器的进程树的当前内存使用量是否超过限制//当java进程exec是一个程序时,它可能会暂时占据其内存大小的两倍,因为JVM执行fork()+exec(),在fork时间创建父内存的副本。//如果监视线程在同一个实例中检测到容器树使用的内存,它可能会认为它超出了限制并杀死该树,因为进程本身没有故障。//我们通过采用启发式检查来解决这个问题:如果进程树超过内存限制两倍以上,它将立即被杀死;如果进程树的进程比监控间隔早,//甚至超过内存限制1倍,它将被杀死。否则,它会被赋予怀疑的标志,可以再进行一次迭代。} else if (isPmemCheckEnabled()&& isProcessTreeOverLimit(containerId.toString(),currentPmemUsage, curRssMemUsageOfAgedProcesses,pmemLimit)) {//当前使用率(年龄=0)始终高于过期使用率。我们不在消息中显示老化的大小,而是根据当前使用情况进行增量long delta = currentPmemUsage - pmemLimit;//容器(根进程)仍处于活动状态,内存溢出//转储流程树,然后进行清理msg = formatErrorMessage("physical",formatUsageString(currentVmemUsage, vmemLimit,currentPmemUsage, pmemLimit),pId, containerId, pTree, delta);isMemoryOverLimit = Optional.of(true);containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;}}if (isMemoryOverLimit.isPresent() && isMemoryOverLimit.get()&& trackingContainers.remove(containerId) != null) {//虚拟内存或物理内存超出限制。使容器失败并删除相应的流程树LOG.warn(msg);//警告(如果不是领导者)if (!pTree.checkPidPgrpidForMatch()) {LOG.error("Killed container process with PID " + pId+ " but it is not a process group leader.");}//杀掉容器eventDispatcher.getEventHandler().handle(new ContainerKillEvent(containerId,containerExitStatus, msg));LOG.info("Removed ProcessTree with root " + pId);}}private void onStopMonitoringContainer(ContainersMonitorEvent monitoringEvent, ContainerId containerId) {LOG.info("Stopping resource-monitoring for " + containerId);updateContainerMetrics(monitoringEvent);trackingContainers.remove(containerId);}private void onStartMonitoringContainer(ContainersMonitorEvent monitoringEvent, ContainerId containerId) {ContainerStartMonitoringEvent startEvent =(ContainerStartMonitoringEvent) monitoringEvent;LOG.info("Starting resource-monitoring for " + containerId);updateContainerMetrics(monitoringEvent);trackingContainers.put(containerId,new ProcessTreeInfo(containerId, null, null,startEvent.getVmemLimit(), startEvent.getPmemLimit(),startEvent.getCpuVcores()));}
}

四、总结

1、启动容器触发ContainerEventType.CONTAINER_LAUNCHED事件

2、ContainerImpl会处理1中事件,启动容器的同时触发容器监控事件ContainersMonitorEventType.START_MONITORING_CONTAINER

3、该事件由ContainersMonitorImpl调用onStartMonitoringContainer()处理2中事件

4、将启动的容器id、虚拟内存限制、物理内存限制、cpu核数限制封装成ProcessTreeInfo,并放到跟踪所有容器的trackingContainers中

5、ContainersMonitorImpl初始化时会获取监控容器的频率(默认3s一次)、监控容器日志目录大小频率(默认1min一次)、容器磁盘大小限制(默认1G)、全部容器总磁盘大小限制(默认10G)、系统资源计算插件(可以自己实现,默认LINUX 使用SysInfoLinux,WINDOWS 使用SysInfoWindows)、计算processTree资源利用率的类、系统为YARN容器留内存大小、YARN容器可用vcore数、虚拟内存和物理内存比率、内存控制策略等

6、ContainersMonitorImpl启动时会启动一个线程(monitoringThread)对容器的资源使用进行监控,如果超过限制就杀掉容器。默认只开启这一个线程,oomListenerThread和logMonitorThread默认不开启

这篇关于Hadoop-Yarn-NodeManager是如何监控容器的的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755269

相关文章

C#实现系统信息监控与获取功能

《C#实现系统信息监控与获取功能》在C#开发的众多应用场景中,获取系统信息以及监控用户操作有着广泛的用途,比如在系统性能优化工具中,需要实时读取CPU、GPU资源信息,本文将详细介绍如何使用C#来实现... 目录前言一、C# 监控键盘1. 原理与实现思路2. 代码实现二、读取 CPU、GPU 资源信息1.

Spring核心思想之浅谈IoC容器与依赖倒置(DI)

《Spring核心思想之浅谈IoC容器与依赖倒置(DI)》文章介绍了Spring的IoC和DI机制,以及MyBatis的动态代理,通过注解和反射,Spring能够自动管理对象的创建和依赖注入,而MyB... 目录一、控制反转 IoC二、依赖倒置 DI1. 详细概念2. Spring 中 DI 的实现原理三、

使用zabbix进行监控网络设备流量

《使用zabbix进行监控网络设备流量》这篇文章主要为大家详细介绍了如何使用zabbix进行监控网络设备流量,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录安装zabbix配置ENSP环境配置zabbix实行监控交换机测试一台liunx服务器,这里使用的为Ubuntu22.04(

springboot健康检查监控全过程

《springboot健康检查监控全过程》文章介绍了SpringBoot如何使用Actuator和Micrometer进行健康检查和监控,通过配置和自定义健康指示器,开发者可以实时监控应用组件的状态,... 目录1. 引言重要性2. 配置Spring Boot ActuatorSpring Boot Act

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五

python使用watchdog实现文件资源监控

《python使用watchdog实现文件资源监控》watchdog支持跨平台文件资源监控,可以检测指定文件夹下文件及文件夹变动,下面我们来看看Python如何使用watchdog实现文件资源监控吧... python文件监控库watchdogs简介随着Python在各种应用领域中的广泛使用,其生态环境也

流媒体平台/视频监控/安防视频汇聚EasyCVR播放暂停后视频画面黑屏是什么原因?

视频智能分析/视频监控/安防监控综合管理系统EasyCVR视频汇聚融合平台,是TSINGSEE青犀视频垂直深耕音视频流媒体技术、AI智能技术领域的杰出成果。该平台以其强大的视频处理、汇聚与融合能力,在构建全栈视频监控系统中展现出了独特的优势。视频监控管理系统EasyCVR平台内置了强大的视频解码、转码、压缩等技术,能够处理多种视频流格式,并以多种格式(RTMP、RTSP、HTTP-FLV、WebS

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。