聊聊PowerJob的HashedWheelTimer

2024-01-22 17:36

本文主要是介绍聊聊PowerJob的HashedWheelTimer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob的HashedWheelTimer

Timer

tech/powerjob/server/common/timewheel/Timer.java

public interface Timer {/*** 调度定时任务*/TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);/*** 停止所有调度任务*/Set<TimerTask> stop();
}

Timer接口定义了schedule方法,用于在指定时间之后调度TimerTask,它返回TimerFuture;stop方法返回未处理的TimerTask

TimerTask

@FunctionalInterface
public interface TimerTask extends Runnable {
}

TimerTask继承了Runnable

TimerFuture

tech/powerjob/server/common/timewheel/TimerFuture.java

public interface TimerFuture {TimerTask getTask();boolean cancel();boolean isCancelled();boolean isDone();
}            

TimerFuture定于了getTask、cancel、isCancelled、isDone方法

HashedWheelTimer

tech/powerjob/server/common/timewheel/HashedWheelTimer.java

@Slf4j
public class HashedWheelTimer implements Timer {private final long tickDuration;private final HashedWheelBucket[] wheel;private final int mask;private final Indicator indicator;private final long startTime;private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();private final ExecutorService taskProcessPool;public HashedWheelTimer(long tickDuration, int ticksPerWheel) {this(tickDuration, ticksPerWheel, 0);}/*** 新建时间轮定时器* @param tickDuration 时间间隔,单位毫秒(ms)* @param ticksPerWheel 轮盘个数* @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池)*/public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {this.tickDuration = tickDuration;// 初始化轮盘,大小格式化为2的N次,可以使用 & 代替取余int ticksNum = CommonUtils.formatSize(ticksPerWheel);wheel = new HashedWheelBucket[ticksNum];for (int i = 0; i < ticksNum; i++) {wheel[i] = new HashedWheelBucket();}mask = wheel.length - 1;// 初始化执行线程池if (processThreadNum <= 0) {taskProcessPool = null;}else {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();// 这里需要调整一下队列大小BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192);int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);// 基本都是 io 密集型任务taskProcessPool = new ThreadPoolExecutor(core, 2 * core,60, TimeUnit.SECONDS,queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));}startTime = System.currentTimeMillis();// 启动后台线程indicator = new Indicator();new Thread(indicator, "HashedWheelTimer-Indicator").start();}//......
}    

HashedWheelTimer实现了Timer接口,其构造器要求输入tickDuration、ticksPerWheel,它会将ticksPerWheel转换为2的N次,然后创建对应的HashedWheelBucket,若processThreadNum大于0则同时创建ThreadPoolExecutor用于处理任务,最后启动异步线程执行Indicator

schedule

    @Overridepublic TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) {long targetTime = System.currentTimeMillis() + unit.toMillis(delay);HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime);// 直接运行到期、过期任务if (delay <= 0) {runTask(timerFuture);return timerFuture;}// 写入阻塞队列,保证并发安全(性能进一步优化可以考虑 Netty 的 Multi-Producer-Single-Consumer队列)waitingTasks.add(timerFuture);return timerFuture;}

schedule方法先计算目标时间,然后创建对应的HashedWheelTimerFuture,若delay小于等于0则执行runTask,否则添加到waitingTasks

stop

    @Overridepublic Set<TimerTask> stop() {indicator.stop.set(true);taskProcessPool.shutdown();while (!taskProcessPool.isTerminated()) {try {Thread.sleep(100);}catch (Exception ignore) {}}return indicator.getUnprocessedTasks();}

stop方法先设置indicator的stop为true,然后执行taskProcessPool.shutdown(),等待关闭,最后返回indicator.getUnprocessedTasks()

HashedWheelBucket

    private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> {public void expireTimerTasks(long currentTick) {removeIf(timerFuture -> {// processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {return true;}if (timerFuture.status != HashedWheelTimerFuture.WAITING) {log.warn("[HashedWheelTimer] impossible, please fix the bug");return true;}// 本轮直接调度if (timerFuture.totalTicks <= currentTick) {if (timerFuture.totalTicks < currentTick) {log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug");}try {// 提交执行runTask(timerFuture);}catch (Exception ignore) {} finally {timerFuture.status = HashedWheelTimerFuture.FINISHED;}return true;}return false;});}}private void runTask(HashedWheelTimerFuture timerFuture) {timerFuture.status = HashedWheelTimerFuture.RUNNING;if (taskProcessPool == null) {timerFuture.timerTask.run();}else {taskProcessPool.submit(timerFuture.timerTask);}}    

HashedWheelBucket继承了LinkedList,其泛型为HashedWheelTimerFuture,它提供了expireTimerTasks方法,通过removeIf删除status为CANCELED、status不为WAITING,以及执行runTask(注意这里忽略了异常)之后标记status为FINISHED的元素;runTask先标记为RUNNING,对于taskProcessPool为null则直接执行,否则提交到taskProcessPool

HashedWheelTimerFuture

tech/powerjob/server/common/timewheel/HashedWheelTimer.java

    private final class HashedWheelTimerFuture implements TimerFuture {// 预期执行时间private final long targetTime;private final TimerTask timerTask;// 所属的时间格,用于快速删除该任务private HashedWheelBucket bucket;// 总圈数private long totalTicks;// 当前状态 0 - 初始化等待中,1 - 运行中,2 - 完成,3 - 已取消private int status;// 状态枚举值private static final int WAITING = 0;private static final int RUNNING = 1;private static final int FINISHED = 2;private static final int CANCELED = 3;public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) {this.targetTime = targetTime;this.timerTask = timerTask;this.status = WAITING;}@Overridepublic TimerTask getTask() {return timerTask;}@Overridepublic boolean cancel() {if (status == WAITING) {status = CANCELED;canceledTasks.add(this);return true;}return false;}@Overridepublic boolean isCancelled() {return status == CANCELED;}@Overridepublic boolean isDone() {return status == FINISHED;}}

HashedWheelTimerFuture实现了TimerFuture接口,它定义了WAITING、RUNNING、FINISHED、CANCELED状态;初始状态为WAITING,对于WAITING状态的可以设置为CANCELED,并添加到canceledTasks;isCancelled判断状态是不是CANCELED,isDone判断状态是不是FINISHED

getUnprocessedTasks

        public Set<TimerTask> getUnprocessedTasks() {try {latch.await();}catch (Exception ignore) {}Set<TimerTask> tasks = Sets.newHashSet();Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {if (timerFuture.status == HashedWheelTimerFuture.WAITING) {tasks.add(timerFuture.timerTask);}};waitingTasks.forEach(consumer);for (HashedWheelBucket bucket : wheel) {bucket.forEach(consumer);}return tasks;}

getUnprocessedTasks会等待Indicator的while循环结束,然后遍历所有的HashedWheelBucket找出状态还是WAITING的任务

Indicator

    private class Indicator implements Runnable {private long tick = 0;private final AtomicBoolean stop = new AtomicBoolean(false);private final CountDownLatch latch = new CountDownLatch(1);@Overridepublic void run() {while (!stop.get()) {// 1. 将任务从队列推入时间轮pushTaskToBucket();// 2. 处理取消的任务processCanceledTasks();// 3. 等待指针跳向下一刻tickTack();// 4. 执行定时任务int currentIndex = (int) (tick & mask);HashedWheelBucket bucket = wheel[currentIndex];bucket.expireTimerTasks(tick);tick ++;}latch.countDown();}/*** 模拟指针转动,当返回时指针已经转到了下一个刻度*/private void tickTack() {// 下一次调度的绝对时间long nextTime = startTime + (tick + 1) * tickDuration;long sleepTime = nextTime - System.currentTimeMillis();if (sleepTime > 0) {try {Thread.sleep(sleepTime);}catch (Exception ignore) {}}}/*** 处理被取消的任务*/private void processCanceledTasks() {while (true) {HashedWheelTimerFuture canceledTask = canceledTasks.poll();if (canceledTask == null) {return;}// 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理)if (canceledTask.bucket != null) {canceledTask.bucket.remove(canceledTask);}}}/*** 将队列中的任务推入时间轮中*/private void pushTaskToBucket() {while (true) {HashedWheelTimerFuture timerTask = waitingTasks.poll();if (timerTask == null) {return;}// 总共的偏移量long offset = timerTask.targetTime - startTime;// 总共需要走的指针步数timerTask.totalTicks = offset / tickDuration;// 取余计算 bucket indexint index = (int) (timerTask.totalTicks & mask);HashedWheelBucket bucket = wheel[index];// TimerTask 维护 Bucket 引用,用于删除该任务timerTask.bucket = bucket;if (timerTask.status == HashedWheelTimerFuture.WAITING) {bucket.add(timerTask);}}}public Set<TimerTask> getUnprocessedTasks() {try {latch.await();}catch (Exception ignore) {}Set<TimerTask> tasks = Sets.newHashSet();Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {if (timerFuture.status == HashedWheelTimerFuture.WAITING) {tasks.add(timerFuture.timerTask);}};waitingTasks.forEach(consumer);for (HashedWheelBucket bucket : wheel) {bucket.forEach(consumer);}return tasks;}}

Indicator实现了Runnable接口,其run方法在stop为false的时候循环执行,pushTaskToBucket、processCanceledTasks、tickTack、expireTimerTasks

pushTaskToBucket

        private void pushTaskToBucket() {while (true) {HashedWheelTimerFuture timerTask = waitingTasks.poll();if (timerTask == null) {return;}// 总共的偏移量long offset = timerTask.targetTime - startTime;// 总共需要走的指针步数timerTask.totalTicks = offset / tickDuration;// 取余计算 bucket indexint index = (int) (timerTask.totalTicks & mask);HashedWheelBucket bucket = wheel[index];// TimerTask 维护 Bucket 引用,用于删除该任务timerTask.bucket = bucket;if (timerTask.status == HashedWheelTimerFuture.WAITING) {bucket.add(timerTask);}}}

pushTaskToBucket通过waitingTasks.poll()拉取任务,若为null直接返回,否则通过timerTask.targetTime与startTime计算offset,再根据tickDuration计算需要走的步数,然后计算并获取目标HashedWheelBucket,然后将timerTask添加到bucket中

processCanceledTasks

        private void processCanceledTasks() {while (true) {HashedWheelTimerFuture canceledTask = canceledTasks.poll();if (canceledTask == null) {return;}// 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理)if (canceledTask.bucket != null) {canceledTask.bucket.remove(canceledTask);}}}

processCanceledTasks会执行canceledTasks.poll()拉取canceledTask,若canceledTask.bucket不为null则将canceledTask从该bucket中移除

tickTack

        private void tickTack() {// 下一次调度的绝对时间long nextTime = startTime + (tick + 1) * tickDuration;long sleepTime = nextTime - System.currentTimeMillis();if (sleepTime > 0) {try {Thread.sleep(sleepTime);}catch (Exception ignore) {}}}

tickTack模拟指针移动,它先计算nextTime,再计算需要sleep多久,然后执行Thread.sleep(sleepTime)

小结

PowerJob定义了Timer接口,并提供了HashedWheelTimer的实现,它定义了waitingTasks、canceledTasks两个LinkedBlockingQueue(无界队列),同时还支持定义任务处理线程池的core线程数;它通过Indicator线程来处理时间轮的转动及任务处理,Indicator循环将waitingTasks的任务放入到对应的bucket,然后模拟时间轮等待,然后通过bucket.expireTimerTasks(tick)处理到期任务,最后再递增tick。

这篇关于聊聊PowerJob的HashedWheelTimer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/633711

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试

【聊聊经济社会】论阶级跨越

为什么要在市场中寻求自由,在市场中寻求洒脱,原因不胜其数,其中便有一条,现实生活中多是xx,可能社会属性本身就具备党同伐异,像是一股意志,平庸一切不平庸,中和一切特立独行,最终以达到一种变态的稳定. 消其意志,断其未来,耗其钱财 ,而我称之为阶级壁垒 阶级之所以难以跨越,主要也在于这三点 一:没有这样的志向,像那种羡慕有钱,或者羡慕有权,权当做梦。这样的志向,正常人只停留于羡慕的层次,而一旦受到丁

聊聊PC端页面适配

聊聊PC端页面适配  目也pc端有适配的需求:目前我们pc项目的设计稿尺寸是宽度1920,高度最小是1080。 适配目标: 1.在不同分辨率的电脑上,网页可以正常显示 2.放大或者缩小屏幕,网页可以正常显示 对于宽度的适配   对于宽度适配: 首先设置html,body{width:100%;overflow-x:hidden;} 然后我们可以把页面分解为背景层(

来聊聊我用go手写redis这件事

写在文章开头 网上有看过一些实现redis的项目,要么完全脱离go语言的理念,要么又完全去迎合c的实现理念,也不是说这些项目写的不好,只能说不符合笔者所认为的那种"平衡",于是整理了一段时间的设计稿,自己尝试着用go语言写了一版"有redis味道"的mini-redis。 截至目前,笔者已经完成了redis服务端和客户端交互的基本通信架构和实现基调,如下所示,可以看到笔者已经实现了ping

供应链劫持?聊聊什么是RepoJacking

介绍        近几个月来,对开源存储库的主要威胁就包括存储仓库劫持,通常称为RepoJacking。RepoJacking 是指恶意攻击者通过一定手段接管托管存储库的所有权或维护者的账户。通过获取对账户的访问权限,攻击者可以将恶意代码注入到使用对应仓库作为依赖项的项目中。 RepoJacking 如何攻击?        存储库攻击,也称为供应链攻击,通常利用 GitH