dubbo之时间轮算法分析

2024-09-01 16:20
文章标签 算法 分析 时间 dubbo

本文主要是介绍dubbo之时间轮算法分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、参数说明
  • 二、具体实现
    • 1、HashedWheelTimer
    • 2、createWheel
    • 3、newTimeout
    • 4、start
    • 5、run
    • 6、waitForNextTick
    • 7、transferTimeoutsToBuckets
    • 8、expireTimeouts
  • 总结


前言

时间轮(TimingWheel)是一种高效利用线程资源进行批量化调度的算法,广泛应用于各种操作系统的定时任务调度中,如Linux的crontab,以及Java开发中常用的Dubbo、Netty、Kafka等框架。时间轮的核心思想是将时间划分为若干个时间间隔(bucket),每个时间间隔代表一个时间段,并通过一个循环的数据结构(类似于时钟)来管理这些时间间隔。


一、参数说明

在这里插入图片描述

  • tickDuration:表示一个槽所代表的时间范围 默认100ms
  • ticksPerWheel:表示该时间轮有多少个槽 默认512
  • startTime:表示该时间轮的开始时间
  • interval:时间轮所能表示的时间跨度,也就是 tickDuration * ticksPerWheel
  • currentTime:表示当前时间,也就是时间轮指针指向的时间
  • wheel:表示TimerTaskList的数组,即各个槽,每个bucket都是一个 HashedWheelBucket 。
  • HashedWheelBucket:存储TimerTaskEntry的双向链表
  • HashedWheelTimeout:延迟任务,有两个值 deadline 和 remainingRounds
  • deadline:TimerTask 最后的执行时间
  • remainingRounds:剩余圈数
  • timeouts:用来保存新增的HashedWheelTimeout,每次执行会拿出10W个放入HashedWheelBucket

二、具体实现

1、HashedWheelTimer

时间轮实现类

	public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel,long maxPendingTimeouts) {// 检查参数if (threadFactory == null) {throw new NullPointerException("threadFactory");}if (unit == null) {throw new NullPointerException("unit");}if (tickDuration <= 0) {throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);}if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}// Normalize ticksPerWheel to power of two and initialize the wheel.// 创建时间轮wheel = createWheel(ticksPerWheel);// 位运算标识 // 因为一圈的长度为2的n次方,mask = 2^n-1后低位将全部是1,然后deadline& mast == deadline % wheel.length    // deadline = System.nanoTime() + unit.toNanos(delay) - startTime; TODOmask = wheel.length - 1;// Convert tickDuration to nanos.// 时间轮的基本时间跨度,转成最小时间单位Nanosthis.tickDuration = unit.toNanos(tickDuration);// Prevent overflow.// 时间跨度限制不能太大,计算会有问题if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d",tickDuration, Long.MAX_VALUE / wheel.length));}// 创建时间轮工作线程workerThread = threadFactory.newThread(worker);this.maxPendingTimeouts = maxPendingTimeouts;// 延迟任务太多的时间,警告日志if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}

参数说明:

  • threadFactory
    线程工厂,创建时间轮线程
  • tickDuration
    每一tick的时间
  • timeUnit
    tickDuration的时间单位
  • ticksPerWheel
    就是轮子一共有多个格子,即要多少个tick才能走完这个wheel一圈。

2、createWheel

创建时间轮

    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {if (ticksPerWheel <= 0) {throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);}if (ticksPerWheel > 1073741824) {throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);}// 如果不是2^n 则调整为2^nticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);// 初始化时间轮槽HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i++) {wheel[i] = new HashedWheelBucket();}return wheel;}

3、newTimeout

添加新任务

	public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {if (task == null) {throw new NullPointerException("task");}if (unit == null) {throw new NullPointerException("unit");}long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();// 如果任务大于最大量,则不允许继续添加if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts ("+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "+ "timeouts (" + maxPendingTimeouts + ")");}// 启动时间轮工作线程start();// Add the timeout to the timeout queue which will be processed on the next tick.// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;// Guard against overflow.// 如果为负数,那么说明超过了long的最大值if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);timeouts.add(timeout);return timeout;}

4、start

启动时间轮

    public void start() {switch (WORKER_STATE_UPDATER.get(this)) {// 如果是初始化 启动实践论case WORKER_STATE_INIT:// 保证只启动一次,原子性if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;// 已经启动过了case WORKER_STATE_STARTED:break;// 时间轮停止了case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}// Wait until the startTime is initialized by the worker.// 线程启动执行任务是异步的,这里是等待workerThread.start(),线程已经启动了while (startTime == 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {// Ignore - it will be ready very soon.}}}

5、run

workerThread.start()启动后,会执行Worker的run方法

public void run() {// Initialize the startTime.startTime = System.nanoTime();if (startTime == 0) {// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.startTime = 1;}// Notify the other threads waiting for the initialization at start().// 唤醒被阻塞的start()方法。startTimeInitialized.countDown();do {// 等下一个槽的到达时间,开始执行上一个槽的任务 TODO 不明白这里的设计,哪位大佬知道可以指点一下final long deadline = waitForNextTick();if (deadline > 0) {// 计算时间轮的槽位int idx = (int) (tick & mask);// 移除取消的了taskprocessCancelledTasks();HashedWheelBucket bucket = wheel[idx];// 将newTimeout()方法中加入到待处理定时任务队列中的任务加入到指定的格子中transferTimeoutsToBuckets();// 运行目前指针指向的槽中的bucket链表中的任务,执行到期的延时任务bucket.expireTimeouts(deadline);tick++;}} // 如果Worker_State一只是started状态,就一直循环while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);// Fill the unprocessedTimeouts so we can return them from stop() method.for (HashedWheelBucket bucket : wheel) {// 清除时间轮中超时未处理的任务bucket.clearTimeouts(unprocessedTimeouts);}for (; ; ) {// 遍历任务队列,发现如果有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中。HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}// 再次移除取消的了taskprocessCancelledTasks();
}

6、waitForNextTick

一个钟表上的间隔是代表一个单位时间的间隔,那么waitForNextTick就是根据当前时间计算出跳动到下个时间的时间间隔,然后进行sleep,结束后进入下一个时间间隔,下一个间隔到来的时候返回。

	/*** 根据startTime和当前槽位计算目标nanoTime,* 等待时间到达** @return Long.MIN_VALUE if received a shutdown request,* current time otherwise (with Long.MIN_VALUE changed by +1)*/private long waitForNextTick() {// tick槽位,tickDuration表示每个时间格的跨度,所以deadline返回的是下一次时间轮指针跳动的时间long deadline = tickDuration * (tick + 1);for (; ; ) {// 计算当前时间距离启动时间的时间间隔,期间休眠final long currentTime = System.nanoTime() - startTime;// 计算sleepTimeMs先加999999,应该是不足1ms的,补足1mslong sleepTimeMs = (deadline - currentTime + 999999) / 1000000;// sleepTimeMs小于零表示走到了下一个时间槽位置if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}// Windows 时间换算if (isWindows()) {sleepTimeMs = sleepTimeMs / 10 * 10;}try {// 当前时间距离下一次tick时间还有一段距离,需要sleepThread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}

7、transferTimeoutsToBuckets

转移任务到时间轮的具体槽中

	// 将延时任务队列中等待添加到时间轮中的延时任务转移到时间轮的指定位置private void transferTimeoutsToBuckets() {// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just// adds new timeouts in a loop.// 循环100000次,也就是每次转移10w个任务// 为了防止这个操作销毁太多时间,导致更多的任务时间不准,因此一次最多操作10w个for (int i = 0; i < 100000; i++) {// 从阻塞队列中获得具体的任务HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {// all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {// Was cancelled in the meantime.continue;}// 计算tick次数,deadline表示当前任务的延迟时间,tickDuration表示时间槽的间隔,两者相除就可以计算当前任务需要tick几次才能被执行long calculated = timeout.deadline / tickDuration;// 计算剩余的轮数, 只有 timer 走够轮数, 并且到达了 task 所在的 slot, task 才会过期.(被执行)timeout.remainingRounds = (calculated - tick) / wheel.length;// Ensure we don't schedule for past.// 如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 此方法调用完后就会被执行final long ticks = Math.max(calculated, tick);// 算出任务应该插入的 wheel 的 slot, stopIndex = tick 次数 & mask, mask = wheel.length - 1int stopIndex = (int) (ticks & mask);// 把timeout任务插入到指定的bucket链中。HashedWheelBucket bucket = wheel[stopIndex];bucket.addTimeout(timeout);}}

8、expireTimeouts

当指针跳动到某一个时间槽中时,会就触发这个槽中的任务的执行。该功能是通过expireTimeouts来实现

	void expireTimeouts(long deadline) {// 双向链表HashedWheelTimeout timeout = head;// process all timeouts// 遍历当前时间槽中的所有任务while (timeout != null) {HashedWheelTimeout next = timeout.next;// 如果当前任务要被执行,那么remainingRounds应该小于或者等于0if (timeout.remainingRounds <= 0) {// 从bucket链表中移除当前timeout,并返回链表中下一个timeoutnext = remove(timeout);// 如果timeout的时间小于当前的时间,那么就调用expire执行taskif (timeout.deadline <= deadline) {timeout.expire();} else {// The timeout was placed into a wrong slot. This should never happen.// 不可能发生的情况,就是说round已经为0了,deadline却 > 当前槽的deadlinethrow new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next = remove(timeout);} else {// 因为当前的槽位已经过了,说明已经走了一圈了,把轮数减一timeout.remainingRounds--;}// 把指针放置到下一个timeouttimeout = next;}}

总结

时间轮(TimingWheel)在计算机科学中,特别是在任务调度和时间管理方面,具有重要的意义,我们可以结合业务进行使用

  • 节省cpu资源

  • 易于实现和维护

  • 批量化调度模型

  • 高效处理大量定时任务

  • 灵活适应不同应用场景

这篇关于dubbo之时间轮算法分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1127440

相关文章

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

SpringBoot实现MD5加盐算法的示例代码

《SpringBoot实现MD5加盐算法的示例代码》加盐算法是一种用于增强密码安全性的技术,本文主要介绍了SpringBoot实现MD5加盐算法的示例代码,文中通过示例代码介绍的非常详细,对大家的学习... 目录一、什么是加盐算法二、如何实现加盐算法2.1 加盐算法代码实现2.2 注册页面中进行密码加盐2.

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

Java时间轮调度算法的代码实现

《Java时间轮调度算法的代码实现》时间轮是一种高效的定时调度算法,主要用于管理延时任务或周期性任务,它通过一个环形数组(时间轮)和指针来实现,将大量定时任务分摊到固定的时间槽中,极大地降低了时间复杂... 目录1、简述2、时间轮的原理3. 时间轮的实现步骤3.1 定义时间槽3.2 定义时间轮3.3 使用时

C++ 各种map特点对比分析

《C++各种map特点对比分析》文章比较了C++中不同类型的map(如std::map,std::unordered_map,std::multimap,std::unordered_multima... 目录特点比较C++ 示例代码 ​​​​​​代码解释特点比较1. std::map底层实现:基于红黑

Spring、Spring Boot、Spring Cloud 的区别与联系分析

《Spring、SpringBoot、SpringCloud的区别与联系分析》Spring、SpringBoot和SpringCloud是Java开发中常用的框架,分别针对企业级应用开发、快速开... 目录1. Spring 框架2. Spring Boot3. Spring Cloud总结1. Sprin

Python如何获取域名的SSL证书信息和到期时间

《Python如何获取域名的SSL证书信息和到期时间》在当今互联网时代,SSL证书的重要性不言而喻,它不仅为用户提供了安全的连接,还能提高网站的搜索引擎排名,那我们怎么才能通过Python获取域名的S... 目录了解SSL证书的基本概念使用python库来抓取SSL证书信息安装必要的库编写获取SSL证书信息

Spring 中 BeanFactoryPostProcessor 的作用和示例源码分析

《Spring中BeanFactoryPostProcessor的作用和示例源码分析》Spring的BeanFactoryPostProcessor是容器初始化的扩展接口,允许在Bean实例化前... 目录一、概览1. 核心定位2. 核心功能详解3. 关键特性二、Spring 内置的 BeanFactory