聊聊PowerJob的SystemInfoController

2024-01-29 13:52

本文主要是介绍聊聊PowerJob的SystemInfoController,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob的SystemInfoController

SystemInfoController

tech/powerjob/server/web/controller/SystemInfoController.java

@Slf4j
@RestController
@RequestMapping("/system")
@RequiredArgsConstructor
public class SystemInfoController {private final JobInfoRepository jobInfoRepository;private final InstanceInfoRepository instanceInfoRepository;private final ServerInfoService serverInfoService;private final WorkerClusterQueryService workerClusterQueryService;@GetMapping("/listWorker")public ResultDTO<List<WorkerStatusVO>> listWorker(Long appId) {List<WorkerInfo> workerInfos = workerClusterQueryService.getAllWorkers(appId);return ResultDTO.success(workerInfos.stream().map(WorkerStatusVO::new).collect(Collectors.toList()));}@GetMapping("/overview")public ResultDTO<SystemOverviewVO> getSystemOverview(Long appId) {SystemOverviewVO overview = new SystemOverviewVO();// 总任务数量overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV()));// 运行任务数overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));// 近期失败任务数(24H内)Date date = DateUtils.addDays(new Date(), -1);overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));// 服务器时区overview.setTimezone(TimeZone.getDefault().getDisplayName());// 服务器时间overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN));overview.setServerInfo(serverInfoService.fetchServiceInfo());return ResultDTO.success(overview);}}

SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数

getAllWorkers

tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

    @DesignateServerpublic List<WorkerInfo> getAllWorkers(Long appId) {List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());return workers;}private Map<String, WorkerInfo> getWorkerInfosByAppId(Long appId) {ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId);if (clusterStatusHolder == null) {log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);return Collections.emptyMap();}return clusterStatusHolder.getAllWorkers();}public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {return WorkerClusterManagerService.getAppId2ClusterStatus();}        

getAllWorkers通过getWorkerInfosByAppId获取WorkerInfo,然后根据getSystemMetrics().calculateScore()进行排序

WorkerClusterManagerService

tech/powerjob/server/remote/worker/WorkerClusterManagerService.java

@Slf4j
public class WorkerClusterManagerService {/*** 存储Worker健康信息,appId -> ClusterStatusHolder*/private static final Map<Long, ClusterStatusHolder> APP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap();/*** 更新状态* @param heartbeat Worker的心跳包*/public static void updateStatus(WorkerHeartbeat heartbeat) {Long appId = heartbeat.getAppId();String appName = heartbeat.getAppName();ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));clusterStatusHolder.updateStatus(heartbeat);}/*** 清理不需要的worker信息* @param usingAppIds 需要维护的appId,其余的数据将被删除*/public static void clean(List<Long> usingAppIds) {Set<Long> keys = Sets.newHashSet(usingAppIds);APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));}/*** 清理缓存信息,防止 OOM*/public static void cleanUp() {APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release);}protected static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {return APP_ID_2_CLUSTER_STATUS;}}

WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat)

updateStatus

tech/powerjob/server/remote/worker/ClusterStatusHolder.java

    public void updateStatus(WorkerHeartbeat heartbeat) {String workerAddress = heartbeat.getWorkerAddress();long heartbeatTime = heartbeat.getHeartbeatTime();WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {WorkerInfo wf = new WorkerInfo();wf.refresh(heartbeat);return wf;});long oldTime = workerInfo.getLastActiveTime();if (heartbeatTime < oldTime) {log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());return;}workerInfo.refresh(heartbeat);List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();if (!CollectionUtils.isEmpty(containerInfos)) {containerInfos.forEach(containerInfo -> {Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());infos.put(workerAddress, containerInfo);});}}

updateStatus方法先根据workerAddress获取workerInfo,若heartbeatTime大于等于lastActiveTime则执行workerInfo.refresh(heartbeat),同时更新containerInfos

getSystemMetrics

tech/powerjob/worker/common/utils/SystemInfoUtils.java

public class SystemInfoUtils {private static final NumberFormat NF = NumberFormat.getNumberInstance();static {NF.setMaximumFractionDigits(4);NF.setMinimumFractionDigits(4);NF.setRoundingMode(RoundingMode.HALF_UP);// 不按照千分位输出NF.setGroupingUsed(false);}// JMX bean can be accessed externally and is meant for management tools like hyperic ( or even nagios ) - It would delegate to Runtime anyway.private static final Runtime runtime = Runtime.getRuntime();private static final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();public static SystemMetrics getSystemMetrics() {SystemMetrics metrics = new SystemMetrics();fillCPUInfo(metrics);fillMemoryInfo(metrics);fillDiskInfo(metrics);// 在Worker完成分数计算,减小Server压力metrics.calculateScore();return metrics;}private static void fillCPUInfo(SystemMetrics metrics) {metrics.setCpuProcessors(osMXBean.getAvailableProcessors());metrics.setCpuLoad(miniDouble(osMXBean.getSystemLoadAverage()));}private static void fillMemoryInfo(SystemMetrics metrics) {// JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存,即-Xmx参数设置的值,totalMemory指JVM当前持久的总内存)long maxMemory = runtime.maxMemory();long usedMemory = runtime.totalMemory() - runtime.freeMemory();metrics.setJvmMaxMemory(bytes2GB(maxMemory));// 已使用内存:当前申请总量 - 当前空余量metrics.setJvmUsedMemory(bytes2GB(usedMemory));// 已用内存比例metrics.setJvmMemoryUsage(miniDouble((double) usedMemory / maxMemory));}private static void fillDiskInfo(SystemMetrics metrics) {long free = 0;long total = 0;File[] roots = File.listRoots();for (File file : roots) {free += file.getFreeSpace();total += file.getTotalSpace();}metrics.setDiskUsed(bytes2GB(total - free));metrics.setDiskTotal(bytes2GB(total));metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal()));}private static double bytes2GB(long bytes) {return miniDouble(bytes / 1024.0 / 1024 / 1024);}private static double miniDouble(double origin) {return Double.parseDouble(NF.format(origin));}}

SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore();cpu信息通过osMXBean.getAvailableProcessors()、osMXBean.getSystemLoadAverage()获取;memory信息通过Runtime获取;disk信息则通过遍历File.listRoots()去统计freeSpace及totalSpace

calculateScore

tech/powerjob/common/model/SystemMetrics.java

    public int calculateScore() {if (score > 0) {return score;}// Memory is vital to TaskTracker, so we set the multiplier factor as 2.double memScore = (jvmMaxMemory - jvmUsedMemory) * 2;// Calculate the remaining load of CPU. Multiplier is set as 1.double cpuScore = cpuProcessors - cpuLoad;// Windows can not fetch CPU load, set cpuScore as 1.if (cpuScore > cpuProcessors) {cpuScore = 1;}score = (int) (memScore + cpuScore);return score;}

SystemMetrics的calculateScore则是由memScore、cpuScore两部分相加而成;memScore为(jvmMaxMemory - jvmUsedMemory) * 2,cpuScore为cpuProcessors - cpuLoad

小结

SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数;WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat);WorkerInfo包含了SystemMetrics,SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore()。

这篇关于聊聊PowerJob的SystemInfoController的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/657137

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试

【聊聊经济社会】论阶级跨越

为什么要在市场中寻求自由,在市场中寻求洒脱,原因不胜其数,其中便有一条,现实生活中多是xx,可能社会属性本身就具备党同伐异,像是一股意志,平庸一切不平庸,中和一切特立独行,最终以达到一种变态的稳定. 消其意志,断其未来,耗其钱财 ,而我称之为阶级壁垒 阶级之所以难以跨越,主要也在于这三点 一:没有这样的志向,像那种羡慕有钱,或者羡慕有权,权当做梦。这样的志向,正常人只停留于羡慕的层次,而一旦受到丁

聊聊PC端页面适配

聊聊PC端页面适配  目也pc端有适配的需求:目前我们pc项目的设计稿尺寸是宽度1920,高度最小是1080。 适配目标: 1.在不同分辨率的电脑上,网页可以正常显示 2.放大或者缩小屏幕,网页可以正常显示 对于宽度的适配   对于宽度适配: 首先设置html,body{width:100%;overflow-x:hidden;} 然后我们可以把页面分解为背景层(

来聊聊我用go手写redis这件事

写在文章开头 网上有看过一些实现redis的项目,要么完全脱离go语言的理念,要么又完全去迎合c的实现理念,也不是说这些项目写的不好,只能说不符合笔者所认为的那种"平衡",于是整理了一段时间的设计稿,自己尝试着用go语言写了一版"有redis味道"的mini-redis。 截至目前,笔者已经完成了redis服务端和客户端交互的基本通信架构和实现基调,如下所示,可以看到笔者已经实现了ping

供应链劫持?聊聊什么是RepoJacking

介绍        近几个月来,对开源存储库的主要威胁就包括存储仓库劫持,通常称为RepoJacking。RepoJacking 是指恶意攻击者通过一定手段接管托管存储库的所有权或维护者的账户。通过获取对账户的访问权限,攻击者可以将恶意代码注入到使用对应仓库作为依赖项的项目中。 RepoJacking 如何攻击?        存储库攻击,也称为供应链攻击,通常利用 GitH