聊聊PowerJob的TimingStrategyHandler

2024-01-10 06:44

本文主要是介绍聊聊PowerJob的TimingStrategyHandler,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob的TimingStrategyHandler

TimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/TimingStrategyHandler.java

public interface TimingStrategyHandler {/*** 校验表达式** @param timeExpression 时间表达式*/void validate(String timeExpression);/*** 计算下次触发时间** @param preTriggerTime 上次触发时间 (not null)* @param timeExpression 时间表达式* @param startTime      开始时间(include)* @param endTime        结束时间(include)* @return next trigger time*/Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime);/*** 支持的定时策略** @return TimeExpressionType*/TimeExpressionType supportType();}

TimingStrategyHandler接口定义了validate、calculateNextTriggerTime、supportType方法

TimeExpressionType

tech/powerjob/common/enums/TimeExpressionType.java

@Getter
@AllArgsConstructor
@ToString
public enum TimeExpressionType {API(1),CRON(2),FIXED_RATE(3),FIXED_DELAY(4),WORKFLOW(5),DAILY_TIME_INTERVAL(11);private final int v;public static final List<Integer> FREQUENT_TYPES = Collections.unmodifiableList(Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v));/*** 首次计算触发时间时必须计算出一个有效值*/public static final List<Integer> INSPECT_TYPES =  Collections.unmodifiableList(Lists.newArrayList(CRON.v, DAILY_TIME_INTERVAL.v));public static TimeExpressionType of(int v) {for (TimeExpressionType type : values()) {if (type.v == v) {return type;}}throw new IllegalArgumentException("unknown TimeExpressionType of " + v);}
}

TimeExpressionType枚举定义了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL几种类型

AbstractTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/AbstractTimingStrategyHandler.java

public abstract class AbstractTimingStrategyHandler implements TimingStrategyHandler {@Overridepublic void validate(String timeExpression) {// do nothing}@Overridepublic Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {// do nothingreturn null;}
}

AbstractTimingStrategyHandler实现了TimingStrategyHandler的validate、calculateNextTriggerTime方法

ApiTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/ApiTimingStrategyHandler.java

@Component
public class ApiTimingStrategyHandler extends AbstractTimingStrategyHandler {@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.API;}
}

ApiTimingStrategyHandler继承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.API

FixedRateTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/FixedRateTimingStrategyHandler.java

@Component
public class FixedRateTimingStrategyHandler extends AbstractTimingStrategyHandler {@Overridepublic void validate(String timeExpression) {long delay;try {delay = Long.parseLong(timeExpression);} catch (Exception e) {throw new PowerJobException("invalid timeExpression!");}// 默认 120s ,超过这个限制应该使用考虑使用其他类型以减少资源占用int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));if (delay > maxInterval) {throw new PowerJobException("the rate must be less than " + maxInterval + "ms");}if (delay <= 0) {throw new PowerJobException("the rate must be greater than 0 ms");}}@Overridepublic Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {long r = startTime != null && startTime > preTriggerTime? startTime : preTriggerTime + Long.parseLong(timeExpression);return endTime != null && endTime < r ? null : r;}@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.FIXED_RATE;}
}

FixedRateTimingStrategyHandler继承了AbstractTimingStrategyHandler,其validate方法校验interval参数,要求大于0而且不能大于120s;calculateNextTriggerTime方法先根据startTime、preTriggerTime、timeExpression计算再与endTime做比较;其supportType返回的是TimeExpressionType.FIXED_RATE

FixedDelayTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/FixedDelayTimingStrategyHandler.java

@Component
public class FixedDelayTimingStrategyHandler extends AbstractTimingStrategyHandler {@Overridepublic void validate(String timeExpression) {long delay;try {delay = Long.parseLong(timeExpression);} catch (Exception e) {throw new PowerJobException("invalid timeExpression!");}// 默认 120s ,超过这个限制应该考虑使用其他类型以减少资源占用int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));if (delay > maxInterval) {throw new PowerJobException("the delay must be less than " + maxInterval + "ms");}if (delay <= 0) {throw new PowerJobException("the delay must be greater than 0 ms");}}@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.FIXED_DELAY;}
}

FixedDelayTimingStrategyHandler继承了AbstractTimingStrategyHandler,其validate要求delay大于0且小于等于120s;其supportType返回的是TimeExpressionType.FIXED_DELAY

WorkflowTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/WorkflowTimingStrategyHandler.java

@Component
public class WorkflowTimingStrategyHandler extends AbstractTimingStrategyHandler {@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.WORKFLOW;}
}

WorkflowTimingStrategyHandler继承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.WORKFLOW

CronTimingStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/CronTimingStrategyHandler.java

@Component
public class CronTimingStrategyHandler implements TimingStrategyHandler {private final CronParser cronParser;/*** @see CronDefinitionBuilder#instanceDefinitionFor* <p>* Enhanced quartz cron,Support for specifying both a day-of-week and a day-of-month parameter.* https://github.com/PowerJob/PowerJob/issues/382*/public CronTimingStrategyHandler() {CronDefinition cronDefinition = CronDefinitionBuilder.defineCron().withSeconds().withValidRange(0, 59).and().withMinutes().withValidRange(0, 59).and().withHours().withValidRange(0, 23).and().withDayOfMonth().withValidRange(1, 31).supportsL().supportsW().supportsLW().supportsQuestionMark().and().withMonth().withValidRange(1, 12).and().withDayOfWeek().withValidRange(1, 7).withMondayDoWValue(2).supportsHash().supportsL().supportsQuestionMark().and().withYear().withValidRange(1970, 2099).withStrictRange().optional().and().instance();this.cronParser = new CronParser(cronDefinition);}@Overridepublic void validate(String timeExpression) {cronParser.parse(timeExpression);}@Overridepublic Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {Cron cron = cronParser.parse(timeExpression);ExecutionTime executionTime = ExecutionTime.forCron(cron);if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {// 需要计算出离 startTime 最近的一次真正的触发时间Optional<ZonedDateTime> zonedDateTime = executionTime.lastExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()));preTriggerTime = zonedDateTime.map(dateTime -> dateTime.toEpochSecond() * 1000).orElse(startTime);}Instant instant = Instant.ofEpochMilli(preTriggerTime);ZonedDateTime preZonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());Optional<ZonedDateTime> opt = executionTime.nextExecution(preZonedDateTime);if (opt.isPresent()) {long nextTriggerTime = opt.get().toEpochSecond() * 1000;if (endTime != null && endTime < nextTriggerTime) {return null;}return nextTriggerTime;}return null;}@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.CRON;}
}

CronTimingStrategyHandler实现了TimingStrategyHandler接口,其构造器先创建了CronDefinition,再根据CronDefinition创建CronParser;其validate方法调用了cronParser.parse(timeExpression);calculateNextTriggerTime方法先解析timeExpression,再解析为ExecutionTime,再根据startTime和preTriggerTime计算新的preTriggerTime;最后通过executionTime.nextExecution计算nextTriggerTime;其supportType返回的是TimeExpressionType.CRON

DailyTimeIntervalStrategyHandler

tech/powerjob/server/core/scheduler/auxiliary/impl/DailyTimeIntervalStrategyHandler.java

@Component
public class DailyTimeIntervalStrategyHandler implements TimingStrategyHandler {/*** 使用中国星期!!!*/private static final Set<Integer> ALL_DAY = Sets.newHashSet(1, 2, 3, 4, 5, 6, 7);@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.DAILY_TIME_INTERVAL;}@Override@SneakyThrowspublic void validate(String timeExpression) {DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class);CommonUtils.requireNonNull(ep.interval, "interval can't be null or empty in DailyTimeIntervalExpress");CommonUtils.requireNonNull(ep.startTimeOfDay, "startTimeOfDay can't be null or empty in DailyTimeIntervalExpress");CommonUtils.requireNonNull(ep.endTimeOfDay, "endTimeOfDay can't be null or empty in DailyTimeIntervalExpress");TimeOfDay startTime = TimeOfDay.from(ep.startTimeOfDay);TimeOfDay endTime = TimeOfDay.from(ep.endTimeOfDay);if (endTime.before(startTime)) {throw new IllegalArgumentException("endTime should after startTime!");}if (StringUtils.isNotEmpty(ep.intervalUnit)) {TimeUnit.valueOf(ep.intervalUnit);}}@Override@SneakyThrowspublic Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class);// 未开始状态下,用起点算调度时间if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {return calculateInRangeTime(startTime, ep);}// 间隔时间TimeUnit timeUnit = Optional.ofNullable(ep.intervalUnit).map(TimeUnit::valueOf).orElse(TimeUnit.SECONDS);long interval = timeUnit.toMillis(ep.interval);Long ret = calculateInRangeTime(preTriggerTime + interval, ep);if (ret == null || ret <= Optional.ofNullable(endTime).orElse(Long.MAX_VALUE)) {return ret;}return null;}/*** 计算最近一次在范围中的时间* @param time 当前时间基准,可能直接返回该时间作为结果* @param ep 表达式* @return 最近一次在范围中的时间*/static Long calculateInRangeTime(Long time, DailyTimeIntervalExpress ep) {Calendar calendar = Calendar.getInstance();calendar.setTime(new Date(time));int year = calendar.get(Calendar.YEAR);// 月份 + 1,转为熟悉的 1~12 月int month = calendar.get(Calendar.MONTH) + 1;int day = calendar.get(Calendar.DAY_OF_MONTH);// 判断是否符合"日"的执行条件int week = TimeUtils.calculateWeek(year, month, day);Set<Integer> targetDays = CollectionUtils.isEmpty(ep.daysOfWeek) ? ALL_DAY : ep.daysOfWeek;// 未包含情况下,将时间改写为符合条件日的 00:00 分,重新开始递归(这部分应该有性能更优的写法,不过这个调度模式应该很难触发瓶颈,先简单好用的实现)if (!targetDays.contains(week)) {simpleSetCalendar(calendar, 0, 0, 0);Date tomorrowZero = DateUtils.addDays(calendar.getTime(), 1);return calculateInRangeTime(tomorrowZero.getTime(), ep);}// 范围的开始时间TimeOfDay rangeStartTime = TimeOfDay.from(ep.startTimeOfDay);simpleSetCalendar(calendar, rangeStartTime.getHour(), rangeStartTime.getMinute(), rangeStartTime.getSecond());long todayStartTs = calendar.getTimeInMillis();// 未开始if (time < todayStartTs) {return todayStartTs;}TimeOfDay rangeEndTime = TimeOfDay.from(ep.endTimeOfDay);simpleSetCalendar(calendar, rangeEndTime.getHour(), rangeEndTime.getMinute(), rangeEndTime.getSecond());long todayEndTs = calendar.getTimeInMillis();// 范围之间if (time <= todayEndTs) {return time;}// 已结束,重新计算第二天时间simpleSetCalendar(calendar, 0, 0, 0);return calculateInRangeTime(DateUtils.addDays(calendar.getTime(), 1).getTime(), ep);}//......
}    

DailyTimeIntervalStrategyHandler实现了TimingStrategyHandler接口,其supportType返回的是TimeExpressionType.DAILY_TIME_INTERVAL;其validate方法先解析参数为DailyTimeIntervalExpress,然后校验其endTime不能比startTime小;其calculateNextTriggerTime方法主要是通过calculateInRangeTime来计算最近一次在范围中的时间

TimingStrategyService

tech/powerjob/server/core/scheduler/TimingStrategyService.java

@Slf4j
@Service
public class TimingStrategyService {private static final int NEXT_N_TIMES = 5;private static final List<String> TIPS = Collections.singletonList("It is valid, but has not trigger time list!");private final Map<TimeExpressionType, TimingStrategyHandler> strategyContainer;public TimingStrategyService(List<TimingStrategyHandler> timingStrategyHandlers) {// initstrategyContainer = new EnumMap<>(TimeExpressionType.class);for (TimingStrategyHandler timingStrategyHandler : timingStrategyHandlers) {strategyContainer.put(timingStrategyHandler.supportType(), timingStrategyHandler);}}/*** 计算接下来几次的调度时间** @param timeExpressionType 定时表达式类型* @param timeExpression     表达式* @param startTime          起始时间(include)* @param endTime            结束时间(include)* @return 调度时间列表*/public List<String> calculateNextTriggerTimes(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {TimingStrategyHandler timingStrategyHandler = getHandler(timeExpressionType);List<Long> triggerTimeList = new ArrayList<>(NEXT_N_TIMES);Long nextTriggerTime = System.currentTimeMillis();do {nextTriggerTime = timingStrategyHandler.calculateNextTriggerTime(nextTriggerTime, timeExpression, startTime, endTime);if (nextTriggerTime == null) {break;}triggerTimeList.add(nextTriggerTime);} while (triggerTimeList.size() < NEXT_N_TIMES);if (triggerTimeList.isEmpty()) {return TIPS;}return triggerTimeList.stream().map(t -> DateFormatUtils.format(t, OmsConstant.TIME_PATTERN)).collect(Collectors.toList());}/*** 计算下次的调度时间** @param preTriggerTime     上次触发时间(nullable)* @param timeExpressionType 定时表达式类型* @param timeExpression     表达式* @param startTime          起始时间(include)* @param endTime            结束时间(include)* @return 下次的调度时间*/public Long calculateNextTriggerTime(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {if (preTriggerTime == null || preTriggerTime < System.currentTimeMillis()) {preTriggerTime = System.currentTimeMillis();}return getHandler(timeExpressionType).calculateNextTriggerTime(preTriggerTime, timeExpression, startTime, endTime);}/*** 计算下次的调度时间并检查校验规则** @param timeExpressionType 定时表达式类型* @param timeExpression     表达式* @param startTime          起始时间(include)* @param endTime            结束时间(include)* @return 下次的调度时间*/public Long calculateNextTriggerTimeWithInspection( TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {Long nextTriggerTime = calculateNextTriggerTime(null, timeExpressionType, timeExpression, startTime, endTime);if (TimeExpressionType.INSPECT_TYPES.contains(timeExpressionType.getV()) && nextTriggerTime == null) {throw new PowerJobException("time expression is out of date: " + timeExpression);}return nextTriggerTime;}public void validate(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {if (endTime != null) {if (endTime <= System.currentTimeMillis()) {throw new PowerJobException("lifecycle is out of date!");}if (startTime != null && startTime > endTime) {throw new PowerJobException("lifecycle is invalid! start time must earlier then end time.");}}getHandler(timeExpressionType).validate(timeExpression);}private TimingStrategyHandler getHandler(TimeExpressionType timeExpressionType) {TimingStrategyHandler timingStrategyHandler = strategyContainer.get(timeExpressionType);if (timingStrategyHandler == null) {throw new PowerJobException("No matching TimingStrategyHandler for this TimeExpressionType:" + timeExpressionType);}return timingStrategyHandler;}}

TimingStrategyService的构造器遍历timingStrategyHandlers,然后根据其supportType构建Map<TimeExpressionType, TimingStrategyHandler>;其calculateNextTriggerTimes先根据timeExpressionType获取到对应的TimingStrategyHandler,再循环调用TimingStrategyHandler.calculateNextTriggerTime方法来计算nextTriggerTime,最后返回最近5次的调度时间;calculateNextTriggerTimeWithInspection方法会计算nextTriggerTime并针对CRON及DAILY_TIME_INTERVAL类型的要求其不能为null;validate方法调用的是对应TimingStrategyHandler的validate方法

小结

TimingStrategyHandler接口定义了validate、calculateNextTriggerTime、supportType方法;其支持的TimeExpressionType枚举定义了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL几种类型,分别对应了ApiTimingStrategyHandler、CronTimingStrategyHandler、FixedRateTimingStrategyHandler、FixedDelayTimingStrategyHandler、WorkflowTimingStrategyHandler、DailyTimeIntervalStrategyHandler;TimingStrategyService则聚合了这些TimingStrategyHandler,对外提供了calculateNextTriggerTimes、calculateNextTriggerTime、calculateNextTriggerTimeWithInspection、validate方法。

这篇关于聊聊PowerJob的TimingStrategyHandler的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/589906

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试

【聊聊经济社会】论阶级跨越

为什么要在市场中寻求自由,在市场中寻求洒脱,原因不胜其数,其中便有一条,现实生活中多是xx,可能社会属性本身就具备党同伐异,像是一股意志,平庸一切不平庸,中和一切特立独行,最终以达到一种变态的稳定. 消其意志,断其未来,耗其钱财 ,而我称之为阶级壁垒 阶级之所以难以跨越,主要也在于这三点 一:没有这样的志向,像那种羡慕有钱,或者羡慕有权,权当做梦。这样的志向,正常人只停留于羡慕的层次,而一旦受到丁

聊聊PC端页面适配

聊聊PC端页面适配  目也pc端有适配的需求:目前我们pc项目的设计稿尺寸是宽度1920,高度最小是1080。 适配目标: 1.在不同分辨率的电脑上,网页可以正常显示 2.放大或者缩小屏幕,网页可以正常显示 对于宽度的适配   对于宽度适配: 首先设置html,body{width:100%;overflow-x:hidden;} 然后我们可以把页面分解为背景层(

来聊聊我用go手写redis这件事

写在文章开头 网上有看过一些实现redis的项目,要么完全脱离go语言的理念,要么又完全去迎合c的实现理念,也不是说这些项目写的不好,只能说不符合笔者所认为的那种"平衡",于是整理了一段时间的设计稿,自己尝试着用go语言写了一版"有redis味道"的mini-redis。 截至目前,笔者已经完成了redis服务端和客户端交互的基本通信架构和实现基调,如下所示,可以看到笔者已经实现了ping

供应链劫持?聊聊什么是RepoJacking

介绍        近几个月来,对开源存储库的主要威胁就包括存储仓库劫持,通常称为RepoJacking。RepoJacking 是指恶意攻击者通过一定手段接管托管存储库的所有权或维护者的账户。通过获取对账户的访问权限,攻击者可以将恶意代码注入到使用对应仓库作为依赖项的项目中。 RepoJacking 如何攻击?        存储库攻击,也称为供应链攻击,通常利用 GitH