聊聊PowerJob的任务调度

2024-02-10 02:52

本文主要是介绍聊聊PowerJob的任务调度,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob的任务调度

CoreScheduleTaskManager

tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java

@Service
@Slf4j
@RequiredArgsConstructor
public class CoreScheduleTaskManager implements InitializingBean, DisposableBean {private final PowerScheduleService powerScheduleService;private final InstanceStatusCheckService instanceStatusCheckService;private final List<Thread> coreThreadContainer = new ArrayList<>();@SuppressWarnings("AlibabaAvoidManuallyCreateThread")@Overridepublic void afterPropertiesSet() {// 定时调度coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)), "Thread-ScheduleCronJob"));coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleDailyTimeIntervalJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)), "Thread-ScheduleDailyTimeIntervalJob"));coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow"));coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob"));// 数据清理coreThreadContainer.add(new Thread(new LoopRunnable("CleanWorkerData", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::cleanData), "Thread-CleanWorkerData"));// 状态检查coreThreadContainer.add(new Thread(new LoopRunnable("CheckRunningInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkRunningInstance), "Thread-CheckRunningInstance"));coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingDispatchInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingDispatchInstance), "Thread-CheckWaitingDispatchInstance"));coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingWorkerReceiveInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingWorkerReceiveInstance), "Thread-CheckWaitingWorkerReceiveInstance"));coreThreadContainer.add(new Thread(new LoopRunnable("CheckWorkflowInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWorkflowInstance), "Thread-CheckWorkflowInstance"));coreThreadContainer.forEach(Thread::start);}//......
}    

CoreScheduleTaskManager在afterPropertiesSet的时候会启动一系列的线程,它们都是LoopRunnable类型的,分别调度powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)、powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)、powerScheduleService::scheduleCronWorkflow、powerScheduleService::scheduleFrequentJob、powerScheduleService::cleanData、instanceStatusCheckService::checkRunningInstance、instanceStatusCheckService::checkWaitingDispatchInstance、instanceStatusCheckService::checkWaitingWorkerReceiveInstance、instanceStatusCheckService::checkWorkflowInstance

LoopRunnable

    @RequiredArgsConstructorprivate static class LoopRunnable implements Runnable {private final String taskName;private final Long runningInterval;private final Runnable innerRunnable;@SuppressWarnings("BusyWait")@Overridepublic void run() {log.info("start task : {}.", taskName);while (true) {try {innerRunnable.run();Thread.sleep(runningInterval);} catch (InterruptedException e) {log.warn("[{}] task has been interrupted!", taskName, e);break;} catch (Exception e) {log.error("[{}] task failed!", taskName, e);}}}}

LoopRunnable的构造器接收taskName、runningInterval、innerRunnable三个参数,其run方法通过while true循环内部执行innerRunnable.run(),执行完sleep指定的runningInterval,若捕获到InterruptedException则break跳出循环,若其他异常则打印error日志

PowerScheduleService

PowerScheduleService主要提供了scheduleNormalJob、scheduleCronWorkflow、scheduleFrequentJob、cleanData方法

scheduleNormalJob

tech/powerjob/server/core/scheduler/PowerScheduleService.java

    public void scheduleNormalJob(TimeExpressionType timeExpressionType) {long start = System.currentTimeMillis();// 调度 CRON 表达式 JOBtry {final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());if (CollectionUtils.isEmpty(allAppIds)) {log.info("[NormalScheduler] current server has no app's job to schedule.");return;}scheduleNormalJob0(timeExpressionType, allAppIds);} catch (Exception e) {log.error("[NormalScheduler] schedule cron job failed.", e);}long cost = System.currentTimeMillis() - start;log.info("[NormalScheduler] {} job schedule use {} ms.", timeExpressionType, cost);if (cost > SCHEDULE_RATE) {log.warn("[NormalScheduler] The database query is using too much time({}ms), please check if the database load is too high!", cost);}}

scheduleNormalJob方法主要是查询当前server负责的appId列表,然后内部委托改为scheduleNormalJob0

scheduleNormalJob0

    private void scheduleNormalJob0(TimeExpressionType timeExpressionType, List<Long> appIds) {long nowTime = System.currentTimeMillis();long timeThreshold = nowTime + 2 * SCHEDULE_RATE;Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {try {// 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), timeExpressionType.getV(), timeThreshold);if (CollectionUtils.isEmpty(jobInfos)) {return;}// 1. 批量写日志表Map<Long, Long> jobId2InstanceId = Maps.newHashMap();log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos);jobInfos.forEach(jobInfo -> {Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId();jobId2InstanceId.put(jobInfo.getId(), instanceId);});instanceInfoRepository.flush();// 2. 推入时间轮中等待调度执行jobInfos.forEach(jobInfoDO -> {Long instanceId = jobId2InstanceId.get(jobInfoDO.getId());long targetTriggerTime = jobInfoDO.getNextTriggerTime();long delay = 0;if (targetTriggerTime < nowTime) {log.warn("[Job-{}] schedule delay, expect: {}, current: {}", jobInfoDO.getId(), targetTriggerTime, System.currentTimeMillis());} else {delay = targetTriggerTime - nowTime;}InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId, Optional.empty(), Optional.empty()));});// 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms)jobInfos.forEach(jobInfoDO -> {try {refreshJob(timeExpressionType, jobInfoDO);} catch (Exception e) {log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);}});jobInfoRepository.flush();} catch (Exception e) {log.error("[NormalScheduler] schedule {} job failed.", timeExpressionType.name(), e);}});}

scheduleNormalJob0主要是调度CRON、DAILY_TIME_INTERVAL类型的任务,它通过jobInfoRepository查找指定appId、状态启用、指定TimeExpressionType,以及NextTriggerTime小于等于nowTime + 2 * SCHEDULE_RATE的任务,然后挨个执行instanceService.create创建任务实例,然后放入到InstanceTimeWheelService.schedule进行调度,最后计算和更新一下每个job的nextTriggerTime

scheduleCronWorkflow

    public void scheduleCronWorkflow() {long start = System.currentTimeMillis();// 调度 CRON 表达式 WORKFLOWtry {final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());if (CollectionUtils.isEmpty(allAppIds)) {log.info("[CronWorkflowSchedule] current server has no app's workflow to schedule.");return;}scheduleWorkflowCore(allAppIds);} catch (Exception e) {log.error("[CronWorkflowSchedule] schedule cron workflow failed.", e);}long cost = System.currentTimeMillis() - start;log.info("[CronWorkflowSchedule] cron workflow schedule use {} ms.", cost);if (cost > SCHEDULE_RATE) {log.warn("[CronWorkflowSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);}}

scheduleCronWorkflow主要是调度CRON 表达式 WORKFLOW,内部委托给scheduleWorkflowCore

scheduleFrequentJob

    public void scheduleFrequentJob() {long start = System.currentTimeMillis();// 调度 FIX_RATE/FIX_DELAY 表达式 JOBtry {final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());if (CollectionUtils.isEmpty(allAppIds)) {log.info("[FrequentJobSchedule] current server has no app's job to schedule.");return;}scheduleFrequentJobCore(allAppIds);} catch (Exception e) {log.error("[FrequentJobSchedule] schedule frequent job failed.", e);}long cost = System.currentTimeMillis() - start;log.info("[FrequentJobSchedule] frequent job schedule use {} ms.", cost);if (cost > SCHEDULE_RATE) {log.warn("[FrequentJobSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);}}

scheduleFrequentJob主要是调度FIX_RATE/FIX_DELAY 表达式 JOB,内部委托给了scheduleFrequentJobCore

scheduleFrequentJobCore

    private void scheduleFrequentJobCore(List<Long> appIds) {Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {try {// 查询所有的秒级任务(只包含ID)List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.FREQUENT_TYPES);if (CollectionUtils.isEmpty(jobIds)) {return;}// 查询日志记录表中是否存在相关的任务List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.GENERALIZED_RUNNING_STATUS);Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList);List<Long> notRunningJobIds = Lists.newLinkedList();jobIds.forEach(jobId -> {if (!runningJobIdSet.contains(jobId)) {notRunningJobIds.add(jobId);}});if (CollectionUtils.isEmpty(notRunningJobIds)) {return;}notRunningJobIds.forEach(jobId -> {Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);jobInfoOpt.ifPresent(jobInfoDO -> {LifeCycle lifeCycle = LifeCycle.parse(jobInfoDO.getLifecycle());// 生命周期已经结束if (lifeCycle.getEnd() != null && lifeCycle.getEnd() < System.currentTimeMillis()) {jobInfoDO.setStatus(SwitchableStatus.DISABLE.getV());jobInfoDO.setGmtModified(new Date());jobInfoRepository.saveAndFlush(jobInfoDO);log.info("[FrequentScheduler] disable frequent job,id:{}.", jobInfoDO.getId());} else if (lifeCycle.getStart() == null || lifeCycle.getStart() < System.currentTimeMillis() + SCHEDULE_RATE * 2) {log.info("[FrequentScheduler] schedule frequent job,id:{}.", jobInfoDO.getId());jobService.runJob(jobInfoDO.getAppId(), jobId, null, Optional.ofNullable(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis());}});});} catch (Exception e) {log.error("[FrequentScheduler] schedule frequent job failed.", e);}});}

scheduleFrequentJobCore主要是调度秒级任务,它先找出秒级任务的id,然后过滤掉正在运行的任务,剩下的未运行的任务挨个判断是否需要调度,需要则执行jobService.runJob

cleanData

    public void cleanData() {try {final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());if (allAppIds.isEmpty()) {return;}WorkerClusterManagerService.clean(allAppIds);} catch (Exception e) {log.error("[CleanData] clean data failed.", e);}}

cleanData主要是通过WorkerClusterManagerService.clean来维护当前server负责的appId缓存

InstanceStatusCheckService

InstanceStatusCheckService提供了checkRunningInstance、checkWaitingDispatchInstance、checkWaitingWorkerReceiveInstance、checkWorkflowInstance方法

小结

PowerJob的CoreScheduleTaskManager在afterPropertiesSet的时候会启动一系列的线程,它们都是LoopRunnable类型的,其中scheduleNormalJob主要是调度CRON、DAILY_TIME_INTERVAL类型的任务,scheduleCronWorkflow主要是调度CRON 表达式 WORKFLOW任务,scheduleFrequentJob主要是调度FIX_RATE/FIX_DELAY 表达式 JOB。

这篇关于聊聊PowerJob的任务调度的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/695967

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

XXL-JOB实践:从零开始构建你的任务调度系统

目录 一、序言1、系统组成2、架构图 二、部署调度中心1、下载源码2、执行数据库脚本3、修改application.properties配置文件4、启动调度中心 三、部署执行器1、引入依赖2、执行器配置2.1 XxlJobProperties属性文件2.2 XxlJobConfig配置类2.3 XxlJobHanlder作业处理器2.4 application.yml 3、启动执行器 四、调

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

XXL-JOB分布式任务调度教程(持续更新~)

先大致声明一下流程(具体细节在下面哦~)  步骤: 1.下载xxl-job并配置以及启动 2.导入对应maven坐标 3.配置对应的配置文件以及编写对应的配置类config 4.编写要触发的方法并且给方法打上@XXlJob("")注解 5.设置xxl-Job平台上的任务    5.1创建执行器  5.2创建任务,5,3配置任务具体细节(比如  (1触发执行器,(2执行时间,(3运行模式,(4以

Linux操作系统学习笔记(七)任务调度

一. 前言   在前文中,我们分析了内核中进程和线程的统一结构体task_struct,并分析进程、线程的创建和派生的过程。在本文中,我们会对任务间调度进行详细剖析,了解其原理和整个执行过程。由此,进程、线程部分的大体框架就算是介绍完了。本节主要分为三个部分:Linux内核中常见的调度策略,调度的基本结构体以及调度发生的整个流程。下面将详细展开说明。 二. 调度策略   Linux的调度策略主

Linux基础 -- pthread之线程池任务调度

线程池任务依赖设计方案 1. 设计目标 为了在多线程环境中支持任务间的依赖关系,我们设计了一个基于 pthread_create 封装的线程池,任务之间可以设置依赖,只有在依赖的任务完成后,依赖任务才会被执行。该设计目标是简化任务调度的逻辑,让开发者可以专注于任务的编写,而不必关注复杂的线程管理和任务依赖的执行顺序。 2. 核心概念 2.1 任务(Task) 任务是线程池中执行的最小单位

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试