dubbo之时间轮算法分析

2024-09-01 16:20
文章标签 算法 分析 时间 dubbo

本文主要是介绍dubbo之时间轮算法分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、参数说明
  • 二、具体实现
    • 1、HashedWheelTimer
    • 2、createWheel
    • 3、newTimeout
    • 4、start
    • 5、run
    • 6、waitForNextTick
    • 7、transferTimeoutsToBuckets
    • 8、expireTimeouts
  • 总结


前言

时间轮(TimingWheel)是一种高效利用线程资源进行批量化调度的算法,广泛应用于各种操作系统的定时任务调度中,如Linux的crontab,以及Java开发中常用的Dubbo、Netty、Kafka等框架。时间轮的核心思想是将时间划分为若干个时间间隔(bucket),每个时间间隔代表一个时间段,并通过一个循环的数据结构(类似于时钟)来管理这些时间间隔。


一、参数说明

在这里插入图片描述

  • tickDuration:表示一个槽所代表的时间范围 默认100ms
  • ticksPerWheel:表示该时间轮有多少个槽 默认512
  • startTime:表示该时间轮的开始时间
  • interval:时间轮所能表示的时间跨度,也就是 tickDuration * ticksPerWheel
  • currentTime:表示当前时间,也就是时间轮指针指向的时间
  • wheel:表示TimerTaskList的数组,即各个槽,每个bucket都是一个 HashedWheelBucket 。
  • HashedWheelBucket:存储TimerTaskEntry的双向链表
  • HashedWheelTimeout:延迟任务,有两个值 deadline 和 remainingRounds
  • deadline:TimerTask 最后的执行时间
  • remainingRounds:剩余圈数
  • timeouts:用来保存新增的HashedWheelTimeout,每次执行会拿出10W个放入HashedWheelBucket

二、具体实现

1、HashedWheelTimer

时间轮实现类

	public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel,long maxPendingTimeouts) {// 检查参数if (threadFactory == null) {throw new NullPointerException("threadFactory");}if (unit == null) {throw new NullPointerException("unit");}if (tickDuration <= 0) {throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);}if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}// Normalize ticksPerWheel to power of two and initialize the wheel.// 创建时间轮wheel = createWheel(ticksPerWheel);// 位运算标识 // 因为一圈的长度为2的n次方,mask = 2^n-1后低位将全部是1,然后deadline& mast == deadline % wheel.length    // deadline = System.nanoTime() + unit.toNanos(delay) - startTime; TODOmask = wheel.length - 1;// Convert tickDuration to nanos.// 时间轮的基本时间跨度,转成最小时间单位Nanosthis.tickDuration = unit.toNanos(tickDuration);// Prevent overflow.// 时间跨度限制不能太大,计算会有问题if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}// 创建时间轮工作线程workerThread = threadFactory.newThread(worker);this.maxPendingTimeouts = maxPendingTimeouts;// 延迟任务太多的时间,警告日志if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}

参数说明:

  • threadFactory
    线程工厂,创建时间轮线程
  • tickDuration
    每一tick的时间
  • timeUnit
    tickDuration的时间单位
  • ticksPerWheel
    就是轮子一共有多个格子,即要多少个tick才能走完这个wheel一圈。

2、createWheel

创建时间轮

    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}if (ticksPerWheel > 1073741824) {throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);}// 如果不是2^n 则调整为2^nticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);// 初始化时间轮槽HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i++) {wheel[i] = new HashedWheelBucket();}return wheel;}

3、newTimeout

添加新任务

	public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();// 如果任务大于最大量,则不允许继续添加if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}// 启动时间轮工作线程start();// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.// 如果为负数,那么说明超过了long的最大值if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}

4、start

启动时间轮

    public void start() {switch (WORKER_STATE_UPDATER.get(this)) {// 如果是初始化 启动实践论case WORKER_STATE_INIT:// 保证只启动一次,原子性if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;// 已经启动过了case WORKER_STATE_STARTED:break;// 时间轮停止了case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// Wait until the startTime is initialized by the worker.// 线程启动执行任务是异步的,这里是等待workerThread.start(),线程已经启动了while (startTime == 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}

5、run

workerThread.start()启动后,会执行Worker的run方法

public void run() {// Initialize the startTime.startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().// 唤醒被阻塞的start()方法。startTimeInitialized.countDown();do {// 等下一个槽的到达时间,开始执行上一个槽的任务 TODO 不明白这里的设计,哪位大佬知道可以指点一下final long deadline = waitForNextTick();if (deadline > 0) {// 计算时间轮的槽位int idx = (int) (tick & mask);// 移除取消的了taskprocessCancelledTasks();HashedWheelBucket bucket = wheel[idx];// 将newTimeout()方法中加入到待处理定时任务队列中的任务加入到指定的格子中transferTimeoutsToBuckets();// 运行目前指针指向的槽中的bucket链表中的任务,执行到期的延时任务bucket.expireTimeouts(deadline);tick++;}} // 如果Worker_State一只是started状态,就一直循环while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket : wheel) {// 清除时间轮中超时未处理的任务bucket.clearTimeouts(unprocessedTimeouts);}for (; ; ) {// 遍历任务队列,发现如果有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中。HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}// 再次移除取消的了taskprocessCancelledTasks();
}

6、waitForNextTick

一个钟表上的间隔是代表一个单位时间的间隔,那么waitForNextTick就是根据当前时间计算出跳动到下个时间的时间间隔,然后进行sleep,结束后进入下一个时间间隔,下一个间隔到来的时候返回。

	/*** 根据startTime和当前槽位计算目标nanoTime,* 等待时间到达** @return Long.MIN_VALUE if received a shutdown request,* current time otherwise (with Long.MIN_VALUE changed by +1)*/private long waitForNextTick() {// tick槽位,tickDuration表示每个时间格的跨度,所以deadline返回的是下一次时间轮指针跳动的时间long deadline = tickDuration * (tick + 1);for (; ; ) {// 计算当前时间距离启动时间的时间间隔,期间休眠final long currentTime = System.nanoTime() - startTime;// 计算sleepTimeMs先加999999,应该是不足1ms的,补足1mslong sleepTimeMs = (deadline - currentTime + 999999) / 1000000;// sleepTimeMs小于零表示走到了下一个时间槽位置if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// Windows 时间换算if (isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}try {// 当前时间距离下一次tick时间还有一段距离,需要sleepThread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}

7、transferTimeoutsToBuckets

转移任务到时间轮的具体槽中

	// 将延时任务队列中等待添加到时间轮中的延时任务转移到时间轮的指定位置private void transferTimeoutsToBuckets() {// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just// adds new timeouts in a loop.// 循环100000次,也就是每次转移10w个任务// 为了防止这个操作销毁太多时间,导致更多的任务时间不准,因此一次最多操作10w个for (int i = 0; i < 100000; i++) {// 从阻塞队列中获得具体的任务HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}// 计算tick次数,deadline表示当前任务的延迟时间,tickDuration表示时间槽的间隔,两者相除就可以计算当前任务需要tick几次才能被执行long calculated = timeout.deadline / tickDuration;// 计算剩余的轮数, 只有 timer 走够轮数, 并且到达了 task 所在的 slot, task 才会过期.(被执行)timeout.remainingRounds = (calculated - tick) / wheel.length;// Ensure we don't schedule for past.// 如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 此方法调用完后就会被执行final long ticks = Math.max(calculated, tick);// 算出任务应该插入的 wheel 的 slot, stopIndex = tick 次数 & mask, mask = wheel.length - 1int stopIndex = (int) (ticks & mask);// 把timeout任务插入到指定的bucket链中。HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}}

8、expireTimeouts

当指针跳动到某一个时间槽中时,会就触发这个槽中的任务的执行。该功能是通过expireTimeouts来实现

	void expireTimeouts(long deadline) {// 双向链表HashedWheelTimeout timeout = head;// process all timeouts// 遍历当前时间槽中的所有任务while (timeout != null) {HashedWheelTimeout next = timeout.next;// 如果当前任务要被执行,那么remainingRounds应该小于或者等于0if (timeout.remainingRounds <= 0) {// 从bucket链表中移除当前timeout,并返回链表中下一个timeoutnext = remove(timeout);// 如果timeout的时间小于当前的时间,那么就调用expire执行taskif (timeout.deadline <= deadline) {timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.// 不可能发生的情况,就是说round已经为0了,deadline却 > 当前槽的deadlinethrow new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {// 因为当前的槽位已经过了,说明已经走了一圈了,把轮数减一timeout.remainingRounds--;}// 把指针放置到下一个timeouttimeout = next;}}

总结

时间轮(TimingWheel)在计算机科学中,特别是在任务调度和时间管理方面,具有重要的意义,我们可以结合业务进行使用

  • 节省cpu资源

  • 易于实现和维护

  • 批量化调度模型

  • 高效处理大量定时任务

  • 灵活适应不同应用场景

这篇关于dubbo之时间轮算法分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1127440

相关文章

不懂推荐算法也能设计推荐系统

本文以商业化应用推荐为例,告诉我们不懂推荐算法的产品,也能从产品侧出发, 设计出一款不错的推荐系统。 相信很多新手产品,看到算法二字,多是懵圈的。 什么排序算法、最短路径等都是相对传统的算法(注:传统是指科班出身的产品都会接触过)。但对于推荐算法,多数产品对着网上搜到的资源,都会无从下手。特别当某些推荐算法 和 “AI”扯上关系后,更是加大了理解的难度。 但,不了解推荐算法,就无法做推荐系

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

康拓展开(hash算法中会用到)

康拓展开是一个全排列到一个自然数的双射(也就是某个全排列与某个自然数一一对应) 公式: X=a[n]*(n-1)!+a[n-1]*(n-2)!+...+a[i]*(i-1)!+...+a[1]*0! 其中,a[i]为整数,并且0<=a[i]<i,1<=i<=n。(a[i]在不同应用中的含义不同); 典型应用: 计算当前排列在所有由小到大全排列中的顺序,也就是说求当前排列是第

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

【数据结构】——原来排序算法搞懂这些就行,轻松拿捏

前言:快速排序的实现最重要的是找基准值,下面让我们来了解如何实现找基准值 基准值的注释:在快排的过程中,每一次我们要取一个元素作为枢纽值,以这个数字来将序列划分为两部分。 在此我们采用三数取中法,也就是取左端、中间、右端三个数,然后进行排序,将中间数作为枢纽值。 快速排序实现主框架: //快速排序 void QuickSort(int* arr, int left, int rig

poj 3974 and hdu 3068 最长回文串的O(n)解法(Manacher算法)

求一段字符串中的最长回文串。 因为数据量比较大,用原来的O(n^2)会爆。 小白上的O(n^2)解法代码:TLE啦~ #include<stdio.h>#include<string.h>const int Maxn = 1000000;char s[Maxn];int main(){char e[] = {"END"};while(scanf("%s", s) != EO

秋招最新大模型算法面试,熬夜都要肝完它

💥大家在面试大模型LLM这个板块的时候,不知道面试完会不会复盘、总结,做笔记的习惯,这份大模型算法岗面试八股笔记也帮助不少人拿到过offer ✨对于面试大模型算法工程师会有一定的帮助,都附有完整答案,熬夜也要看完,祝大家一臂之力 这份《大模型算法工程师面试题》已经上传CSDN,还有完整版的大模型 AI 学习资料,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

dp算法练习题【8】

不同二叉搜索树 96. 不同的二叉搜索树 给你一个整数 n ,求恰由 n 个节点组成且节点值从 1 到 n 互不相同的 二叉搜索树 有多少种?返回满足题意的二叉搜索树的种数。 示例 1: 输入:n = 3输出:5 示例 2: 输入:n = 1输出:1 class Solution {public int numTrees(int n) {int[] dp = new int