Java 源码 - DelayQueue 源码解析

2024-04-30 14:28
文章标签 java 源码 解析 delayqueue

本文主要是介绍Java 源码 - DelayQueue 源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

        • 1. 整体设计
          • 1.1 类注释
          • 1.2、类图
          • 1.3 延迟队列的属性
          • 1.4 DelayQueue 的主要方法
            • 1.4.1 offer 添加元素
            • 1.4.2 take 取出元素
            • 1.4.3 poll 取出元素

1. 整体设计

DelayQueue 延迟队列底层使用的是锁的能力,比如说要在当前时间往后延迟 5 秒执行,那么当前线程就会沉睡 5 秒,等 5 秒后线程被唤醒时,如果能获取到资源的话,线程即可立马执行。原理上似乎很简单,但内部实现却很复杂,有很多难点,比如当运行资源不够,多个线程同时被唤醒时,如何排队等待?比如说在何时阻塞?何时开始执行等等?接下来我们从源码角度来看下是如何实现的。

1.1 类注释

类注释上比较简单,只说了三个概念:

  1. 无界延时队列中元素将在过期时被执行,越靠近队头,越早过期;
  2. 未过期的元素不能够被 take;
  3. 不允许空元素。
1.2、类图

DelayQueue 的类图,关键是 DelayQueue 类上是有泛型的,如下:
image.png

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {

从泛型中可以看出,DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法,如下:

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

也就是说 DelayQueue 队列中的元素必须是实现 Delayed 接口和 Comparable 接口的,并覆写了 getDelay 方法和 compareTo 的方法才行,不然在编译时,编译器就会提醒我们元素必须强制实现 Delayed 接口。
compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。

1.3 延迟队列的属性

DelayQueue 中的重要属性如下所示。

// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();

DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。什么是领导者-追随者模式.

1.4 DelayQueue 的主要方法
1.4.1 offer 添加元素
public boolean offer(E e) {// 获取全局独占锁final ReentrantLock lock = this.lock;lock.lock();try {// 向优先队列中插入元素q.offer(e);// 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程 if (q.peek() == e) {leader = null;available.signal();}return true;} finally {// 释放全局独占锁lock.unlock();}}
leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。

DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。

1.4.2 take 取出元素

take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。
一开始看到全局独占锁,理所当然详情属于队列消费模式。 无法理解 “领导者-追随者模式”。take方法实现了一个“领导者-追随者模式”的线程处理方式,只有leader线程会等待指定时间后获得锁,其他线程都会进入无限期等待。 如果多个线程调用take() 方法, 当available.awaitNanos(delay);的时候, 其它线程可以抢锁进入。 下面有测试例子。源码中:java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject.await()和await(long time, TimeUnit unit); 方法 Node node = addConditionWaiter(); long savedState = fullyRelease(node); 队列状态释放
take方法主要实现逻辑为(for循环体):
1. 获取头节点对象,如果为空,线程释放锁,并进入无限期等待。等待调用offer方法,放入对象后,通过signal()方法唤醒。【看offer方法的源码】
2. 如果头节点对象不为空,获取该对象的延迟时间,如果小于0,直接从队列中取出并移除该对象,返回。
3. 如果头节点对象延迟时间大于0,判断是否“leader线程”是否已经存在,如果存在说明当前线程为“追随者线程”,进入无限期等待(等待leader线程take方法完成后,唤醒)。
4. 如果“leader线程”不存在,把当前线程设置为“leader线程”,释放锁并等待头节点对象的延迟时间后,重新获得锁,下次循环获取头节点对象返回。
5. finally代码块,每次leader线程执行完成take方法后,需要唤醒其他线程获得锁成为新的leader线程。

public E take() throws InterruptedException {// 获取全局独占锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 获取队头元素,peek 方法不会删除元素E first = q.peek();if (first == null)// 若队头为空,则阻塞当前线程available.await();else {// 否则获取队头元素的超时时间long delay = first.getDelay(NANOSECONDS);// 已超时,直接出队if (delay <= 0)return q.poll();// 释放 first 的引用,避免内存泄漏first = null; // don't retain ref while waiting// leader != null 表明有其他线程在操作,阻塞当前线程if (leader != null)available.await();else {// leader 指向当前线程Thread thisThread = Thread.currentThread();leader = thisThread;try {// 超时阻塞available.awaitNanos(delay);} finally {// 释放 leaderif (leader == thisThread)leader = null;}}}}} finally {// leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列if (leader == null && q.peek() != null)available.signal();// 释放全局独占锁    lock.unlock();}
}

Condition.await() 和Condition.await(100, TimeUnit.SECONDS); 方法进入等待时候,其它线程可以抢抢到锁

package com.lvyuanj.test.timer;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
public class TestConditionAwait {private static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {final Condition condition = lock.newCondition();final Thread thread1 = new Thread(new Runnable() {@Overridepublic void run() {lock.lock();try {Thread.currentThread().setName("ConditionAwait");log.error(Thread.currentThread().getName() + " beforeAwaitTime:" + System.currentTimeMillis());condition.await(100, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error(Thread.currentThread().getName() + " finishAwaitTime:" + System.currentTimeMillis());} finally {lock.unlock();log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());}}});Thread thread2 = new Thread(new Runnable() {@Overridepublic void run() {Thread.currentThread().setName("ConditionSignal");try {lock.lock();log.error(Thread.currentThread().getName() + " getLockTime:" + System.currentTimeMillis());//thread1.interrupt();long currentTime = System.currentTimeMillis();while (System.currentTimeMillis() - currentTime < 8000) {}condition.signal();log.error(Thread.currentThread().getName() + " signalTime:" + System.currentTimeMillis());} catch (Exception e) {} finally {lock.unlock();log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());}}});thread1.start();Thread.sleep(50);thread2.start();}
}
1.4.3 poll 取出元素

取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}
}

这篇关于Java 源码 - DelayQueue 源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949109

相关文章

idea maven编译报错Java heap space的解决方法

《ideamaven编译报错Javaheapspace的解决方法》这篇文章主要为大家详细介绍了ideamaven编译报错Javaheapspace的相关解决方法,文中的示例代码讲解详细,感兴趣的... 目录1.增加 Maven 编译的堆内存2. 增加 IntelliJ IDEA 的堆内存3. 优化 Mave

Java String字符串的常用使用方法

《JavaString字符串的常用使用方法》String是JDK提供的一个类,是引用类型,并不是基本的数据类型,String用于字符串操作,在之前学习c语言的时候,对于一些字符串,会初始化字符数组表... 目录一、什么是String二、如何定义一个String1. 用双引号定义2. 通过构造函数定义三、St

springboot filter实现请求响应全链路拦截

《springbootfilter实现请求响应全链路拦截》这篇文章主要为大家详细介绍了SpringBoot如何结合Filter同时拦截请求和响应,从而实现​​日志采集自动化,感兴趣的小伙伴可以跟随小... 目录一、为什么你需要这个过滤器?​​​二、核心实现:一个Filter搞定双向数据流​​​​三、完整代码

SpringBoot利用@Validated注解优雅实现参数校验

《SpringBoot利用@Validated注解优雅实现参数校验》在开发Web应用时,用户输入的合法性校验是保障系统稳定性的基础,​SpringBoot的@Validated注解提供了一种更优雅的解... 目录​一、为什么需要参数校验二、Validated 的核心用法​1. 基础校验2. php分组校验3

Java Predicate接口定义详解

《JavaPredicate接口定义详解》Predicate是Java中的一个函数式接口,它代表一个判断逻辑,接收一个输入参数,返回一个布尔值,:本文主要介绍JavaPredicate接口的定义... 目录Java Predicate接口Java lamda表达式 Predicate<T>、BiFuncti

Spring Security基于数据库的ABAC属性权限模型实战开发教程

《SpringSecurity基于数据库的ABAC属性权限模型实战开发教程》:本文主要介绍SpringSecurity基于数据库的ABAC属性权限模型实战开发教程,本文给大家介绍的非常详细,对大... 目录1. 前言2. 权限决策依据RBACABAC综合对比3. 数据库表结构说明4. 实战开始5. MyBA

Spring Security方法级安全控制@PreAuthorize注解的灵活运用小结

《SpringSecurity方法级安全控制@PreAuthorize注解的灵活运用小结》本文将带着大家讲解@PreAuthorize注解的核心原理、SpEL表达式机制,并通过的示例代码演示如... 目录1. 前言2. @PreAuthorize 注解简介3. @PreAuthorize 核心原理解析拦截与

一文详解JavaScript中的fetch方法

《一文详解JavaScript中的fetch方法》fetch函数是一个用于在JavaScript中执行HTTP请求的现代API,它提供了一种更简洁、更强大的方式来处理网络请求,:本文主要介绍Jav... 目录前言什么是 fetch 方法基本语法简单的 GET 请求示例代码解释发送 POST 请求示例代码解释

Java图片压缩三种高效压缩方案详细解析

《Java图片压缩三种高效压缩方案详细解析》图片压缩通常涉及减少图片的尺寸缩放、调整图片的质量(针对JPEG、PNG等)、使用特定的算法来减少图片的数据量等,:本文主要介绍Java图片压缩三种高效... 目录一、基于OpenCV的智能尺寸压缩技术亮点:适用场景:二、JPEG质量参数压缩关键技术:压缩效果对比

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++