本文主要是介绍聊聊PowerJob的TimingStrategyHandler,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下PowerJob的TimingStrategyHandler
TimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/TimingStrategyHandler.java
public interface TimingStrategyHandler {/*** 校验表达式** @param timeExpression 时间表达式*/void validate(String timeExpression);/*** 计算下次触发时间** @param preTriggerTime 上次触发时间 (not null)* @param timeExpression 时间表达式* @param startTime 开始时间(include)* @param endTime 结束时间(include)* @return next trigger time*/Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime);/*** 支持的定时策略** @return TimeExpressionType*/TimeExpressionType supportType();}
TimingStrategyHandler接口定义了validate、calculateNextTriggerTime、supportType方法
TimeExpressionType
tech/powerjob/common/enums/TimeExpressionType.java
@Getter
@AllArgsConstructor
@ToString
public enum TimeExpressionType {API(1),CRON(2),FIXED_RATE(3),FIXED_DELAY(4),WORKFLOW(5),DAILY_TIME_INTERVAL(11);private final int v;public static final List<Integer> FREQUENT_TYPES = Collections.unmodifiableList(Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v));/*** 首次计算触发时间时必须计算出一个有效值*/public static final List<Integer> INSPECT_TYPES = Collections.unmodifiableList(Lists.newArrayList(CRON.v, DAILY_TIME_INTERVAL.v));public static TimeExpressionType of(int v) {for (TimeExpressionType type : values()) {if (type.v == v) {return type;}}throw new IllegalArgumentException("unknown TimeExpressionType of " + v);}
}
TimeExpressionType枚举定义了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL几种类型
AbstractTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/AbstractTimingStrategyHandler.java
public abstract class AbstractTimingStrategyHandler implements TimingStrategyHandler {@Overridepublic void validate(String timeExpression) {// do nothing}@Overridepublic Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {// do nothingreturn null;}
}
AbstractTimingStrategyHandler实现了TimingStrategyHandler的validate、calculateNextTriggerTime方法
ApiTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/ApiTimingStrategyHandler.java
@Component
public class ApiTimingStrategyHandler extends AbstractTimingStrategyHandler {@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.API;}
}
ApiTimingStrategyHandler继承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.API
FixedRateTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/FixedRateTimingStrategyHandler.java
@Component
public class FixedRateTimingStrategyHandler extends AbstractTimingStrategyHandler {@Overridepublic void validate(String timeExpression) {long delay;try {delay = Long.parseLong(timeExpression);} catch (Exception e) {throw new PowerJobException("invalid timeExpression!");}// 默认 120s ,超过这个限制应该使用考虑使用其他类型以减少资源占用int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));if (delay > maxInterval) {throw new PowerJobException("the rate must be less than " + maxInterval + "ms");}if (delay <= 0) {throw new PowerJobException("the rate must be greater than 0 ms");}}@Overridepublic Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {long r = startTime != null && startTime > preTriggerTime? startTime : preTriggerTime + Long.parseLong(timeExpression);return endTime != null && endTime < r ? null : r;}@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.FIXED_RATE;}
}
FixedRateTimingStrategyHandler继承了AbstractTimingStrategyHandler,其validate方法校验interval参数,要求大于0而且不能大于120s;calculateNextTriggerTime方法先根据startTime、preTriggerTime、timeExpression计算再与endTime做比较;其supportType返回的是TimeExpressionType.FIXED_RATE
FixedDelayTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/FixedDelayTimingStrategyHandler.java
@Component
public class FixedDelayTimingStrategyHandler extends AbstractTimingStrategyHandler {@Overridepublic void validate(String timeExpression) {long delay;try {delay = Long.parseLong(timeExpression);} catch (Exception e) {throw new PowerJobException("invalid timeExpression!");}// 默认 120s ,超过这个限制应该考虑使用其他类型以减少资源占用int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));if (delay > maxInterval) {throw new PowerJobException("the delay must be less than " + maxInterval + "ms");}if (delay <= 0) {throw new PowerJobException("the delay must be greater than 0 ms");}}@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.FIXED_DELAY;}
}
FixedDelayTimingStrategyHandler继承了AbstractTimingStrategyHandler,其validate要求delay大于0且小于等于120s;其supportType返回的是TimeExpressionType.FIXED_DELAY
WorkflowTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/WorkflowTimingStrategyHandler.java
@Component
public class WorkflowTimingStrategyHandler extends AbstractTimingStrategyHandler {@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.WORKFLOW;}
}
WorkflowTimingStrategyHandler继承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.WORKFLOW
CronTimingStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/CronTimingStrategyHandler.java
@Component
public class CronTimingStrategyHandler implements TimingStrategyHandler {private final CronParser cronParser;/*** @see CronDefinitionBuilder#instanceDefinitionFor* <p>* Enhanced quartz cron,Support for specifying both a day-of-week and a day-of-month parameter.* https://github.com/PowerJob/PowerJob/issues/382*/public CronTimingStrategyHandler() {CronDefinition cronDefinition = CronDefinitionBuilder.defineCron().withSeconds().withValidRange(0, 59).and().withMinutes().withValidRange(0, 59).and().withHours().withValidRange(0, 23).and().withDayOfMonth().withValidRange(1, 31).supportsL().supportsW().supportsLW().supportsQuestionMark().and().withMonth().withValidRange(1, 12).and().withDayOfWeek().withValidRange(1, 7).withMondayDoWValue(2).supportsHash().supportsL().supportsQuestionMark().and().withYear().withValidRange(1970, 2099).withStrictRange().optional().and().instance();this.cronParser = new CronParser(cronDefinition);}@Overridepublic void validate(String timeExpression) {cronParser.parse(timeExpression);}@Overridepublic Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {Cron cron = cronParser.parse(timeExpression);ExecutionTime executionTime = ExecutionTime.forCron(cron);if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {// 需要计算出离 startTime 最近的一次真正的触发时间Optional<ZonedDateTime> zonedDateTime = executionTime.lastExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()));preTriggerTime = zonedDateTime.map(dateTime -> dateTime.toEpochSecond() * 1000).orElse(startTime);}Instant instant = Instant.ofEpochMilli(preTriggerTime);ZonedDateTime preZonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());Optional<ZonedDateTime> opt = executionTime.nextExecution(preZonedDateTime);if (opt.isPresent()) {long nextTriggerTime = opt.get().toEpochSecond() * 1000;if (endTime != null && endTime < nextTriggerTime) {return null;}return nextTriggerTime;}return null;}@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.CRON;}
}
CronTimingStrategyHandler实现了TimingStrategyHandler接口,其构造器先创建了CronDefinition,再根据CronDefinition创建CronParser;其validate方法调用了cronParser.parse(timeExpression);calculateNextTriggerTime方法先解析timeExpression,再解析为ExecutionTime,再根据startTime和preTriggerTime计算新的preTriggerTime;最后通过executionTime.nextExecution计算nextTriggerTime;其supportType返回的是TimeExpressionType.CRON
DailyTimeIntervalStrategyHandler
tech/powerjob/server/core/scheduler/auxiliary/impl/DailyTimeIntervalStrategyHandler.java
@Component
public class DailyTimeIntervalStrategyHandler implements TimingStrategyHandler {/*** 使用中国星期!!!*/private static final Set<Integer> ALL_DAY = Sets.newHashSet(1, 2, 3, 4, 5, 6, 7);@Overridepublic TimeExpressionType supportType() {return TimeExpressionType.DAILY_TIME_INTERVAL;}@Override@SneakyThrowspublic void validate(String timeExpression) {DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class);CommonUtils.requireNonNull(ep.interval, "interval can't be null or empty in DailyTimeIntervalExpress");CommonUtils.requireNonNull(ep.startTimeOfDay, "startTimeOfDay can't be null or empty in DailyTimeIntervalExpress");CommonUtils.requireNonNull(ep.endTimeOfDay, "endTimeOfDay can't be null or empty in DailyTimeIntervalExpress");TimeOfDay startTime = TimeOfDay.from(ep.startTimeOfDay);TimeOfDay endTime = TimeOfDay.from(ep.endTimeOfDay);if (endTime.before(startTime)) {throw new IllegalArgumentException("endTime should after startTime!");}if (StringUtils.isNotEmpty(ep.intervalUnit)) {TimeUnit.valueOf(ep.intervalUnit);}}@Override@SneakyThrowspublic Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class);// 未开始状态下,用起点算调度时间if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {return calculateInRangeTime(startTime, ep);}// 间隔时间TimeUnit timeUnit = Optional.ofNullable(ep.intervalUnit).map(TimeUnit::valueOf).orElse(TimeUnit.SECONDS);long interval = timeUnit.toMillis(ep.interval);Long ret = calculateInRangeTime(preTriggerTime + interval, ep);if (ret == null || ret <= Optional.ofNullable(endTime).orElse(Long.MAX_VALUE)) {return ret;}return null;}/*** 计算最近一次在范围中的时间* @param time 当前时间基准,可能直接返回该时间作为结果* @param ep 表达式* @return 最近一次在范围中的时间*/static Long calculateInRangeTime(Long time, DailyTimeIntervalExpress ep) {Calendar calendar = Calendar.getInstance();calendar.setTime(new Date(time));int year = calendar.get(Calendar.YEAR);// 月份 + 1,转为熟悉的 1~12 月int month = calendar.get(Calendar.MONTH) + 1;int day = calendar.get(Calendar.DAY_OF_MONTH);// 判断是否符合"日"的执行条件int week = TimeUtils.calculateWeek(year, month, day);Set<Integer> targetDays = CollectionUtils.isEmpty(ep.daysOfWeek) ? ALL_DAY : ep.daysOfWeek;// 未包含情况下,将时间改写为符合条件日的 00:00 分,重新开始递归(这部分应该有性能更优的写法,不过这个调度模式应该很难触发瓶颈,先简单好用的实现)if (!targetDays.contains(week)) {simpleSetCalendar(calendar, 0, 0, 0);Date tomorrowZero = DateUtils.addDays(calendar.getTime(), 1);return calculateInRangeTime(tomorrowZero.getTime(), ep);}// 范围的开始时间TimeOfDay rangeStartTime = TimeOfDay.from(ep.startTimeOfDay);simpleSetCalendar(calendar, rangeStartTime.getHour(), rangeStartTime.getMinute(), rangeStartTime.getSecond());long todayStartTs = calendar.getTimeInMillis();// 未开始if (time < todayStartTs) {return todayStartTs;}TimeOfDay rangeEndTime = TimeOfDay.from(ep.endTimeOfDay);simpleSetCalendar(calendar, rangeEndTime.getHour(), rangeEndTime.getMinute(), rangeEndTime.getSecond());long todayEndTs = calendar.getTimeInMillis();// 范围之间if (time <= todayEndTs) {return time;}// 已结束,重新计算第二天时间simpleSetCalendar(calendar, 0, 0, 0);return calculateInRangeTime(DateUtils.addDays(calendar.getTime(), 1).getTime(), ep);}//......
}
DailyTimeIntervalStrategyHandler实现了TimingStrategyHandler接口,其supportType返回的是TimeExpressionType.DAILY_TIME_INTERVAL;其validate方法先解析参数为DailyTimeIntervalExpress,然后校验其endTime不能比startTime小;其calculateNextTriggerTime方法主要是通过calculateInRangeTime来计算最近一次在范围中的时间
TimingStrategyService
tech/powerjob/server/core/scheduler/TimingStrategyService.java
@Slf4j
@Service
public class TimingStrategyService {private static final int NEXT_N_TIMES = 5;private static final List<String> TIPS = Collections.singletonList("It is valid, but has not trigger time list!");private final Map<TimeExpressionType, TimingStrategyHandler> strategyContainer;public TimingStrategyService(List<TimingStrategyHandler> timingStrategyHandlers) {// initstrategyContainer = new EnumMap<>(TimeExpressionType.class);for (TimingStrategyHandler timingStrategyHandler : timingStrategyHandlers) {strategyContainer.put(timingStrategyHandler.supportType(), timingStrategyHandler);}}/*** 计算接下来几次的调度时间** @param timeExpressionType 定时表达式类型* @param timeExpression 表达式* @param startTime 起始时间(include)* @param endTime 结束时间(include)* @return 调度时间列表*/public List<String> calculateNextTriggerTimes(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {TimingStrategyHandler timingStrategyHandler = getHandler(timeExpressionType);List<Long> triggerTimeList = new ArrayList<>(NEXT_N_TIMES);Long nextTriggerTime = System.currentTimeMillis();do {nextTriggerTime = timingStrategyHandler.calculateNextTriggerTime(nextTriggerTime, timeExpression, startTime, endTime);if (nextTriggerTime == null) {break;}triggerTimeList.add(nextTriggerTime);} while (triggerTimeList.size() < NEXT_N_TIMES);if (triggerTimeList.isEmpty()) {return TIPS;}return triggerTimeList.stream().map(t -> DateFormatUtils.format(t, OmsConstant.TIME_PATTERN)).collect(Collectors.toList());}/*** 计算下次的调度时间** @param preTriggerTime 上次触发时间(nullable)* @param timeExpressionType 定时表达式类型* @param timeExpression 表达式* @param startTime 起始时间(include)* @param endTime 结束时间(include)* @return 下次的调度时间*/public Long calculateNextTriggerTime(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {if (preTriggerTime == null || preTriggerTime < System.currentTimeMillis()) {preTriggerTime = System.currentTimeMillis();}return getHandler(timeExpressionType).calculateNextTriggerTime(preTriggerTime, timeExpression, startTime, endTime);}/*** 计算下次的调度时间并检查校验规则** @param timeExpressionType 定时表达式类型* @param timeExpression 表达式* @param startTime 起始时间(include)* @param endTime 结束时间(include)* @return 下次的调度时间*/public Long calculateNextTriggerTimeWithInspection( TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {Long nextTriggerTime = calculateNextTriggerTime(null, timeExpressionType, timeExpression, startTime, endTime);if (TimeExpressionType.INSPECT_TYPES.contains(timeExpressionType.getV()) && nextTriggerTime == null) {throw new PowerJobException("time expression is out of date: " + timeExpression);}return nextTriggerTime;}public void validate(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {if (endTime != null) {if (endTime <= System.currentTimeMillis()) {throw new PowerJobException("lifecycle is out of date!");}if (startTime != null && startTime > endTime) {throw new PowerJobException("lifecycle is invalid! start time must earlier then end time.");}}getHandler(timeExpressionType).validate(timeExpression);}private TimingStrategyHandler getHandler(TimeExpressionType timeExpressionType) {TimingStrategyHandler timingStrategyHandler = strategyContainer.get(timeExpressionType);if (timingStrategyHandler == null) {throw new PowerJobException("No matching TimingStrategyHandler for this TimeExpressionType:" + timeExpressionType);}return timingStrategyHandler;}}
TimingStrategyService的构造器遍历timingStrategyHandlers,然后根据其supportType构建
Map<TimeExpressionType, TimingStrategyHandler>
;其calculateNextTriggerTimes先根据timeExpressionType获取到对应的TimingStrategyHandler,再循环调用TimingStrategyHandler.calculateNextTriggerTime方法来计算nextTriggerTime,最后返回最近5次的调度时间;calculateNextTriggerTimeWithInspection方法会计算nextTriggerTime并针对CRON及DAILY_TIME_INTERVAL类型的要求其不能为null;validate方法调用的是对应TimingStrategyHandler的validate方法
小结
TimingStrategyHandler接口定义了validate、calculateNextTriggerTime、supportType方法;其支持的TimeExpressionType枚举定义了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL几种类型,分别对应了ApiTimingStrategyHandler、CronTimingStrategyHandler、FixedRateTimingStrategyHandler、FixedDelayTimingStrategyHandler、WorkflowTimingStrategyHandler、DailyTimeIntervalStrategyHandler;TimingStrategyService则聚合了这些TimingStrategyHandler,对外提供了calculateNextTriggerTimes、calculateNextTriggerTime、calculateNextTriggerTimeWithInspection、validate方法。
这篇关于聊聊PowerJob的TimingStrategyHandler的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!