本文主要是介绍聊聊PowerJob的SystemInfoController,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下PowerJob的SystemInfoController
SystemInfoController
tech/powerjob/server/web/controller/SystemInfoController.java
@Slf4j
@RestController
@RequestMapping("/system")
@RequiredArgsConstructor
public class SystemInfoController {private final JobInfoRepository jobInfoRepository;private final InstanceInfoRepository instanceInfoRepository;private final ServerInfoService serverInfoService;private final WorkerClusterQueryService workerClusterQueryService;@GetMapping("/listWorker")public ResultDTO<List<WorkerStatusVO>> listWorker(Long appId) {List<WorkerInfo> workerInfos = workerClusterQueryService.getAllWorkers(appId);return ResultDTO.success(workerInfos.stream().map(WorkerStatusVO::new).collect(Collectors.toList()));}@GetMapping("/overview")public ResultDTO<SystemOverviewVO> getSystemOverview(Long appId) {SystemOverviewVO overview = new SystemOverviewVO();// 总任务数量overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV()));// 运行任务数overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));// 近期失败任务数(24H内)Date date = DateUtils.addDays(new Date(), -1);overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));// 服务器时区overview.setTimezone(TimeZone.getDefault().getDisplayName());// 服务器时间overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN));overview.setServerInfo(serverInfoService.fetchServiceInfo());return ResultDTO.success(overview);}}
SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数
getAllWorkers
tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
@DesignateServerpublic List<WorkerInfo> getAllWorkers(Long appId) {List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values());workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());return workers;}private Map<String, WorkerInfo> getWorkerInfosByAppId(Long appId) {ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId);if (clusterStatusHolder == null) {log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);return Collections.emptyMap();}return clusterStatusHolder.getAllWorkers();}public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {return WorkerClusterManagerService.getAppId2ClusterStatus();}
getAllWorkers通过getWorkerInfosByAppId获取WorkerInfo,然后根据getSystemMetrics().calculateScore()进行排序
WorkerClusterManagerService
tech/powerjob/server/remote/worker/WorkerClusterManagerService.java
@Slf4j
public class WorkerClusterManagerService {/*** 存储Worker健康信息,appId -> ClusterStatusHolder*/private static final Map<Long, ClusterStatusHolder> APP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap();/*** 更新状态* @param heartbeat Worker的心跳包*/public static void updateStatus(WorkerHeartbeat heartbeat) {Long appId = heartbeat.getAppId();String appName = heartbeat.getAppName();ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));clusterStatusHolder.updateStatus(heartbeat);}/*** 清理不需要的worker信息* @param usingAppIds 需要维护的appId,其余的数据将被删除*/public static void clean(List<Long> usingAppIds) {Set<Long> keys = Sets.newHashSet(usingAppIds);APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));}/*** 清理缓存信息,防止 OOM*/public static void cleanUp() {APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release);}protected static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {return APP_ID_2_CLUSTER_STATUS;}}
WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat)
updateStatus
tech/powerjob/server/remote/worker/ClusterStatusHolder.java
public void updateStatus(WorkerHeartbeat heartbeat) {String workerAddress = heartbeat.getWorkerAddress();long heartbeatTime = heartbeat.getHeartbeatTime();WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> {WorkerInfo wf = new WorkerInfo();wf.refresh(heartbeat);return wf;});long oldTime = workerInfo.getLastActiveTime();if (heartbeatTime < oldTime) {log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());return;}workerInfo.refresh(heartbeat);List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();if (!CollectionUtils.isEmpty(containerInfos)) {containerInfos.forEach(containerInfo -> {Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());infos.put(workerAddress, containerInfo);});}}
updateStatus方法先根据workerAddress获取workerInfo,若heartbeatTime大于等于lastActiveTime则执行workerInfo.refresh(heartbeat),同时更新containerInfos
getSystemMetrics
tech/powerjob/worker/common/utils/SystemInfoUtils.java
public class SystemInfoUtils {private static final NumberFormat NF = NumberFormat.getNumberInstance();static {NF.setMaximumFractionDigits(4);NF.setMinimumFractionDigits(4);NF.setRoundingMode(RoundingMode.HALF_UP);// 不按照千分位输出NF.setGroupingUsed(false);}// JMX bean can be accessed externally and is meant for management tools like hyperic ( or even nagios ) - It would delegate to Runtime anyway.private static final Runtime runtime = Runtime.getRuntime();private static final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();public static SystemMetrics getSystemMetrics() {SystemMetrics metrics = new SystemMetrics();fillCPUInfo(metrics);fillMemoryInfo(metrics);fillDiskInfo(metrics);// 在Worker完成分数计算,减小Server压力metrics.calculateScore();return metrics;}private static void fillCPUInfo(SystemMetrics metrics) {metrics.setCpuProcessors(osMXBean.getAvailableProcessors());metrics.setCpuLoad(miniDouble(osMXBean.getSystemLoadAverage()));}private static void fillMemoryInfo(SystemMetrics metrics) {// JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存,即-Xmx参数设置的值,totalMemory指JVM当前持久的总内存)long maxMemory = runtime.maxMemory();long usedMemory = runtime.totalMemory() - runtime.freeMemory();metrics.setJvmMaxMemory(bytes2GB(maxMemory));// 已使用内存:当前申请总量 - 当前空余量metrics.setJvmUsedMemory(bytes2GB(usedMemory));// 已用内存比例metrics.setJvmMemoryUsage(miniDouble((double) usedMemory / maxMemory));}private static void fillDiskInfo(SystemMetrics metrics) {long free = 0;long total = 0;File[] roots = File.listRoots();for (File file : roots) {free += file.getFreeSpace();total += file.getTotalSpace();}metrics.setDiskUsed(bytes2GB(total - free));metrics.setDiskTotal(bytes2GB(total));metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal()));}private static double bytes2GB(long bytes) {return miniDouble(bytes / 1024.0 / 1024 / 1024);}private static double miniDouble(double origin) {return Double.parseDouble(NF.format(origin));}}
SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore();cpu信息通过osMXBean.getAvailableProcessors()、osMXBean.getSystemLoadAverage()获取;memory信息通过Runtime获取;disk信息则通过遍历File.listRoots()去统计freeSpace及totalSpace
calculateScore
tech/powerjob/common/model/SystemMetrics.java
public int calculateScore() {if (score > 0) {return score;}// Memory is vital to TaskTracker, so we set the multiplier factor as 2.double memScore = (jvmMaxMemory - jvmUsedMemory) * 2;// Calculate the remaining load of CPU. Multiplier is set as 1.double cpuScore = cpuProcessors - cpuLoad;// Windows can not fetch CPU load, set cpuScore as 1.if (cpuScore > cpuProcessors) {cpuScore = 1;}score = (int) (memScore + cpuScore);return score;}
SystemMetrics的calculateScore则是由memScore、cpuScore两部分相加而成;memScore为
(jvmMaxMemory - jvmUsedMemory) * 2
,cpuScore为cpuProcessors - cpuLoad
小结
SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数;WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat);WorkerInfo包含了SystemMetrics,SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore()。
这篇关于聊聊PowerJob的SystemInfoController的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!