Java并发包源码学习系列:阻塞队列实现之DelayQueue源码解析

本文主要是介绍Java并发包源码学习系列:阻塞队列实现之DelayQueue源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • DelayQueue概述
    • 类图及重要字段
    • Delayed接口
    • Delayed元素案例
    • 构造器
    • void put(E e)
    • E take()
      • first = null 有什么用
    • 总结
    • 参考阅读

系列传送门:

  • Java并发包源码学习系列:AbstractQueuedSynchronizer
  • Java并发包源码学习系列:CLH同步队列及同步资源获取与释放
  • Java并发包源码学习系列:AQS共享式与独占式获取与释放资源的区别
  • Java并发包源码学习系列:ReentrantLock可重入独占锁详解
  • Java并发包源码学习系列:ReentrantReadWriteLock读写锁解析
  • Java并发包源码学习系列:详解Condition条件队列、signal和await
  • Java并发包源码学习系列:挂起与唤醒线程LockSupport工具类
  • Java并发包源码学习系列:JDK1.8的ConcurrentHashMap源码解析
  • Java并发包源码学习系列:阻塞队列BlockingQueue及实现原理分析
  • Java并发包源码学习系列:阻塞队列实现之ArrayBlockingQueue源码解析
  • Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析
  • Java并发包源码学习系列:阻塞队列实现之PriorityBlockingQueue源码解析

DelayQueue概述

DelayQueue是一个支持延时获取元素的无界阻塞队列,使用PriorityQueue来存储元素。

队中的元素必须实现Delayed接口【Delay接口又继承了Comparable,需要实现compareTo方法】,每个元素都需要指明过期时间,通过getDelay(unit)获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】,每次向优先队列中添加元素时根据compareTo方法作为排序规则。

当从队列获取元素时,只有过期的元素才会出队列。

使用场景: 缓存系统设计、定时任务调度等。

类图及重要字段

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {// 独占锁实现同步private final transient ReentrantLock lock = new ReentrantLock();// 优先队列存放数据private final PriorityQueue<E> q = new PriorityQueue<E>();/*** 基于Leader-Follower模式的变体,用于尽量减少不必要的线程等待*/private Thread leader = null;/*** 与lock对应的条件变量*/private final Condition available = lock.newCondition();    
}
  1. 使用ReentrantLock独占锁实现线程同步,使用Condition实现等待通知机制。
  2. 基于Leader-Follower模式的变体,减少不必要的线程等待。
  3. 内部使用PriorityQueue优先级队列存储元素,且队列中元素必须实现Delayed接口。

Delayed接口

队中的元素必须实现Delayed接口【Delay接口又继承了Comparable,需要实现compareTo方法】,每个元素都需要指明过期时间,通过getDelay(unit)获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】。

每次向优先队列中添加元素时根据compareTo方法作为排序规则,当然我们约定一下,默认q.peek()出来的就是最先过期的元素。

public interface Delayed extends Comparable<Delayed> {// 返回剩余时间long getDelay(TimeUnit unit);
}public interface Comparable<T> {// 定义比较方法public int compareTo(T o);
}

Delayed元素案例

学习了Delayed接口之后,我们看一个实际的案例,加深印象,源于:《Java并发编程之美》。

    static class DelayedElement implements Delayed {private final long delayTime; // 延迟时间private final long expire; // 到期时间private final String taskName; // 任务名称public DelayedElement (long delayTime, String taskName) {this.delayTime = delayTime;this.taskName = taskName;expire = now() + delayTime;}final long now () {return System.currentTimeMillis();}// 剩余时间 = 到期时间 - 当前时间@Overridepublic long getDelay (TimeUnit unit) {return unit.convert(expire - now(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo (Delayed o) {return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}@Overridepublic String toString () {final StringBuilder res = new StringBuilder("DelayedElement [ ");res.append("delay = ").append(delayTime);res.append(", expire = ").append(expire);res.append(", taskName = '").append(taskName).append('\'');res.append(" ] ");return res.toString();}}public static void main (String[] args) {// 创建delayQueue队列DelayQueue<DelayedElement> delayQueue = new DelayQueue<>();// 创建延迟任务Random random = new Random();for (int i = 0; i < 10; i++) {DelayedElement element = new DelayedElement(random.nextInt(500), "task: " + i);delayQueue.offer(element);}// 依次取出任务并打印DelayedElement ele = null;try {for (; ; ) {while ((ele = delayQueue.take()) != null) {System.out.println(ele);}}} catch (InterruptedException ex) {ex.printStackTrace();}}
// 打印结果
DelayedElement [ delay = 2, expire = 1611995426061, taskName = 'task: 4' ] 
DelayedElement [ delay = 52, expire = 1611995426111, taskName = 'task: 2' ] 
DelayedElement [ delay = 80, expire = 1611995426139, taskName = 'task: 5' ] 
DelayedElement [ delay = 132, expire = 1611995426191, taskName = 'task: 0' ] 
DelayedElement [ delay = 174, expire = 1611995426233, taskName = 'task: 9' ] 
DelayedElement [ delay = 175, expire = 1611995426234, taskName = 'task: 7' ] 
DelayedElement [ delay = 326, expire = 1611995426385, taskName = 'task: 3' ] 
DelayedElement [ delay = 447, expire = 1611995426506, taskName = 'task: 8' ] 
DelayedElement [ delay = 452, expire = 1611995426511, taskName = 'task: 1' ] 
DelayedElement [ delay = 486, expire = 1611995426545, taskName = 'task: 6' ]
  • 实现了compareTo方法,定义比较规则为越早过期的排在队头。
  • 实现了getDelay方法,计算公式为:剩余时间 = 到期时间 - 当前时间。

构造器

DelayQueue构造器相比于前几个,就显得非常easy了。

    public DelayQueue() {}public DelayQueue(Collection<? extends E> c) {this.addAll(c);}

void put(E e)

因为DelayQueue是无界队列,不会因为边界问题产生阻塞,因此put操作和offer操作是一样的。

    public void put(E e) {offer(e);}public boolean offer(E e) {// 获取独占锁final ReentrantLock lock = this.lock;lock.lock();try {// 加入优先队列里q.offer(e);// 判断堆顶元素是不是刚刚插入的元素// 如果判断为true,说明当前这个元素是将最先过期if (q.peek() == e) {// 重置leader线程为nullleader = null; // 激活available变量条件队列中的一个线程available.signal();}return true;} finally {lock.unlock();}}

E take()

take方法将会获取并移除队列里面延迟时间过期的元素 ,如果队列里面没有过期元素则陷入等待。

    public E take() throws InterruptedException {// 获取独占锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 瞅一瞅谁最快过期E first = q.peek();// 队列为空,则将当前线程置入available的条件队列中,直到里面有元素if (first == null)available.await();else {// 看下还有多久过期long delay = first.getDelay(NANOSECONDS);// 哇,已经过期了,就移除它并返回if (delay <= 0)return q.poll();first = null; // don't retain ref while waiting// leader不为null表示其他线程也在执行take// 则将当前线程置入available的条件队列中if (leader != null)available.await();else {// 如果leader为null,则选择当前线程作为leader线程Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待delay时间,时间到之后,会出条件队列,继续竞争锁available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

first = null 有什么用

如果不设置first = null,将会引起内存泄露。

  • 线程A到达,队首元素没有到期,设置leader = 线程A,并且执行available.awaitNanos(delay);等待元素过期。
  • 这时线程B来了,因为leader != null,则会available.await();阻塞,线程C、D、E同理。
  • 线程A阻塞完毕了,再次循环,获取列首元素成功,出列。

这个时候列首元素应该会被回收掉,但是问题是它还被线程B、线程C持有着,所以不会回收,如果线程增多,且队首元素无限期的不能回收,就会造成内存泄漏。

总结

DelayQueue是一个支持延时获取元素无界阻塞队列,使用PriorityQueue来存储元素。

队中的元素必须实现Delayed接口【Delay接口又继承了Comparable,需要实现compareTo方法】,每个元素都需要指明过期时间,通过getDelay(unit)获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】,每次向优先队列中添加元素时根据compareTo方法作为排序规则。

基于Leader-Follower模式使用leader变量,减少不必要的线程等待。

DelayQueue是无界队列,因此插入操作是非阻塞的。但是take操作从队列获取元素时,是阻塞的,阻塞规则为:

  • 当一个线程调用队列的take方法,如果队列为空,则将会调用available.await()陷入阻塞。
  • 如果队列不为空,则查看队列的队首元素是否过期,根据getDelay的返回值是否小于0判断,如果过期则返回该元素。
  • 如果队首元素未过期,则判断当前线程是否为leader线程,如果不是,表明有其他线程在执行take操作,就调用available.await()陷入阻塞。
  • 如果没有其他线程在执行take,就将当前线程设置为leader,并等待队首元素过期,available.awaitNanos(delay)
  • leader线程退出take之后,将会调用available.signal()唤醒一个follower线程,接着回到开始那步。

参考阅读

  • 《Java并发编程的艺术》

  • 《Java并发编程之美》

  • 【死磕Java并发】—–J.U.C之阻塞队列:DelayQueue

这篇关于Java并发包源码学习系列:阻塞队列实现之DelayQueue源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/582123

相关文章

Java实战之利用POI生成Excel图表

《Java实战之利用POI生成Excel图表》ApachePOI是Java生态中处理Office文档的核心工具,这篇文章主要为大家详细介绍了如何在Excel中创建折线图,柱状图,饼图等常见图表,需要的... 目录一、环境配置与依赖管理二、数据源准备与工作表构建三、图表生成核心步骤1. 折线图(Line Ch

Python3脚本实现Excel与TXT的智能转换

《Python3脚本实现Excel与TXT的智能转换》在数据处理的日常工作中,我们经常需要将Excel中的结构化数据转换为其他格式,本文将使用Python3实现Excel与TXT的智能转换,需要的可以... 目录场景应用:为什么需要这种转换技术解析:代码实现详解核心代码展示改进点说明实战演练:从Excel到

如何使用CSS3实现波浪式图片墙

《如何使用CSS3实现波浪式图片墙》:本文主要介绍了如何使用CSS3的transform属性和动画技巧实现波浪式图片墙,通过设置图片的垂直偏移量,并使用动画使其周期性地改变位置,可以创建出动态且具有波浪效果的图片墙,同时,还强调了响应式设计的重要性,以确保图片墙在不同设备上都能良好显示,详细内容请阅读本文,希望能对你有所帮助...

Spring Boot 3 整合 Spring Cloud Gateway实践过程

《SpringBoot3整合SpringCloudGateway实践过程》本文介绍了如何使用SpringCloudAlibaba2023.0.0.0版本构建一个微服务网关,包括统一路由、限... 目录引子为什么需要微服务网关实践1.统一路由2.限流防刷3.登录鉴权小结引子当前微服务架构已成为中大型系统的标

C# string转unicode字符的实现

《C#string转unicode字符的实现》本文主要介绍了C#string转unicode字符的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录1. 获取字符串中每个字符的 Unicode 值示例代码:输出:2. 将 Unicode 值格式化

python安装whl包并解决依赖关系的实现

《python安装whl包并解决依赖关系的实现》本文主要介绍了python安装whl包并解决依赖关系的实现,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录一、什么是whl文件?二、我们为什么需要使用whl文件来安装python库?三、我们应该去哪儿下

Python脚本实现图片文件批量命名

《Python脚本实现图片文件批量命名》这篇文章主要为大家详细介绍了一个用python第三方库pillow写的批量处理图片命名的脚本,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言源码批量处理图片尺寸脚本源码GUI界面源码打包成.exe可执行文件前言本文介绍一个用python第三方库pi

Java集合中的List超详细讲解

《Java集合中的List超详细讲解》本文详细介绍了Java集合框架中的List接口,包括其在集合中的位置、继承体系、常用操作和代码示例,以及不同实现类(如ArrayList、LinkedList和V... 目录一,List的继承体系二,List的常用操作及代码示例1,创建List实例2,增加元素3,访问元

Java中将异步调用转为同步的五种实现方法

《Java中将异步调用转为同步的五种实现方法》本文介绍了将异步调用转为同步阻塞模式的五种方法:wait/notify、ReentrantLock+Condition、Future、CountDownL... 目录异步与同步的核心区别方法一:使用wait/notify + synchronized代码示例关键

Nginx实现动态封禁IP的步骤指南

《Nginx实现动态封禁IP的步骤指南》在日常的生产环境中,网站可能会遭遇恶意请求、DDoS攻击或其他有害的访问行为,为了应对这些情况,动态封禁IP是一项十分重要的安全策略,本篇博客将介绍如何通过NG... 目录1、简述2、实现方式3、使用 fail2ban 动态封禁3.1 安装 fail2ban3.2 配