JUC阻塞队列(四):DelayQueue

2024-08-21 19:12
文章标签 队列 阻塞 juc delayqueue

本文主要是介绍JUC阻塞队列(四):DelayQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、DelayQueue介绍

      DelayQueue 是一个延迟队列,生产者写入一个数据,这个数据具有被直接消费的延迟时间,

      让数据具有延迟的特性。

      DelayQueue底层也是基于二叉堆来实现的,DelayQueue本就是基于PriorityBQueue 实现的。

      二叉堆结构每次获取的是堆顶数据,在比较时,根据延迟时间进行比较,延迟时间剩余端的放

      在堆顶。

      由于 DelayQueue 基于 PriorityQueue  实现的,因此 DelayQueue 理论上也是一个无边界队

      列,DelayQueue 容量可以进行无限扩容。

      

2、DelayQueue核心属性

      由DelayQueue 结构可以发现,DelayQueue 存储的数据必须实现 Delayed 接口

      DelayQueue 结构如下:

            

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {//锁,阻塞队列需要使用锁来保证线程安全//只有一把锁,表示生产者和消费者使用的是同一把锁private final transient ReentrantLock lock = new ReentrantLock();//基于优先级队列 PriorityQueueprivate final PriorityQueue<E> q = new PriorityQueue<E>();/*** leader 一般用来保存等待堆顶数据的消费者线程*/private Thread leader = null;/*** 基于 PriorityQueue(基于二叉堆)实现数据存储,生产者在插入数据时是不会阻塞的,* 当前的Condition就是给消费者用的,当消费者获取数据时,当堆顶数据的延迟时间还不为* 0(即还没到执行时间点),此时消费者线程会阻塞挂起等待一会(等待的是堆顶数据),直到堆顶数据延迟时间为0(到达任务执行时间点)* 或者 生产者新插入的数据到了堆顶,此时生产者会调用Condition.signal() 方法唤醒消费者线程*/private final Condition available = lock.newCondition();public DelayQueue() {}public DelayQueue(Collection<? extends E> c) {this.addAll(c);}
}/**Delayed 继承 Comparable接口,所以 Delayed 的实现都可以进行比较操作
*/
public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);//获取延迟时间,比较延迟时间
}

3、DelayQueue使用示例

      DelayQueue 常用方法也是 BlockingQueue接口中定义的那几个存储数据和获取数据的方法,

      只有一点需要注意,即 DelayQueue 保存的数据必须实现接口Delayed 

       DelayQueue 使用示例如下:

                

public class TaskDelayed implements Delayed {private String name;/** 执行时间点*/private Long time;public TaskDelayed(String name,Long time){this.name = name;this.time = System.currentTimeMillis()+time;}/*** 设置 任务TaskDelayed 什么时候可以出延迟队列DelayedQueue* 该方法返回值小于等于0时任务才会从 延迟队列DelayedQueue 中取出执行**/@Overridepublic long getDelay(TimeUnit unit) {//TimeUnit.MILLISECONDS :将时间转换为毫秒return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);}/*** 比较器* 2个 TaskDelayed 任务在存储到延迟队列时的比较方式,通过time属性进行比较* 返回值:*    < 0: 按从小到大排列*    == 0 : 相等*    >0 : 从大到小排列* @param o* @return*/@Overridepublic int compareTo(Delayed o) {TaskDelayed task = (TaskDelayed) o;return (int)(this.time - task.getTime());}
}public class DelayedQueueDemo01 {public static void main(String[] args) throws InterruptedException {TaskDelayed task1 = new TaskDelayed("Tome",2000L);TaskDelayed task2 = new TaskDelayed("JieRui",4000L);TaskDelayed task3 = new TaskDelayed("zhuDy",3000L);TaskDelayed task4 = new TaskDelayed("zhanmusi",1000L);//DelayQueue 存放的数据必须实现接口DelayedDelayQueue<TaskDelayed> queue = new DelayQueue<>();//添加数据queue.add(task1);queue.offer(task2);queue.offer(task3,4, TimeUnit.SECONDS);queue.put(task4);//取数据System.out.println(queue.remove());//若堆顶数据的延迟时间还没到达,则poll()返回null,remove()会直接抛出异常System.out.println(queue.poll());System.out.println(queue.poll(5,TimeUnit.SECONDS));System.out.println(queue.take());}
}

4、DelayQueue写入流程分析

      因为 DelayQueue 底层是基于 PriorityQueue  实现的,也就是基于二叉堆实现的,所以

      DelayQueue 是一个无界的队列,存储数据的数组可以动态扩容,所以生产者不需要关注

      队列满了而阻塞的问题,因此这里只需要关注offer(E e) 方法就可以了

             offer(E e) 代码如下:

               

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {//直接调用PriorityQueue的插入方法q.offer(e);//判断数据插入后的二叉堆的堆顶元素是不是刚插入的数据,//若是,则说明当前堆顶数据可能已到达延迟时间可以进行消费,唤醒等待的消费者线程,并将当前等待的消费者线程设置为nullif (q.peek() == e) {/*** leader 赋值为null* todo 在消费者消费数据时会判断leader 是否为null*/leader = null;/*** 唤醒挂起阻塞的消费者线程,避免刚插入的数据的延迟时间出现问题* 这里可以发现消费者等待的是堆顶数据*/available.signal();}return true;} finally {lock.unlock();}}

              

5、DelayQueue取数据流程分析

      消费者取数据过程需要考虑阻塞问题:

              1)队列为空,无数据,消费者线程需要挂起等待一会

              2)堆顶数据的延迟时间还没到,此时消费者线程需要挂起等待一会

              3)当消费者A已经在等待堆顶数据,此时消费B也过来取数据,此时消费者B需要

                    挂起等待一会

5.1、remove() 

        该方法功能是取数据,取堆顶数据,若取不到数据,则直接抛出异常。

        注意:若堆顶数据的延迟时间还没到达,则取不到数据,也会抛出异常

        remove 方法如下:

              

5.2、poll()

         该方法功能是读取数据,poll()方法不会阻塞消费者,能获取数据就直接返回,否则返回null

         poll 方法代码如下:

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {//查看堆顶数据E first = q.peek();/***first == null 表示堆为空,没有数据* getDelay 方法返回值大于0,表示堆顶数据还没到延迟时间,不能执行,堆顶数据无法取出*/if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}

5.3、poll(long timeout, TimeUnit unit)

         带超时时间的读取数据的方法

         poll方法代码如下:

            

/***带超时时间的取数据方法*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {//将超时时间转换为纳秒long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//加锁,可被中断,当被中断时会抛出异常,直接退出lock.lockInterruptibly();try {//自旋for (;;) {//查看堆顶数据E first = q.peek();//若堆顶数据为空,即堆无数据,则判断超时时间是否已过,若超时时间也已经过了,则返回nullif (first == null) {if (nanos <= 0)return null;else//阻塞等待,并返回剩余超时时间//等待生产者线程添加数据之后唤醒nanos = available.awaitNanos(nanos);} else {//堆有数据//获取堆顶数据的延迟时间,单位纳秒long delay = first.getDelay(NANOSECONDS);//若堆顶数据的延迟时间小于等于0,表示当前堆顶数据可以执行,立即取出if (delay <= 0)return q.poll();//若堆顶数据的延迟时间大于0(表示堆顶数据还不能执行)且超时时间已经过了,则返回nullif (nanos <= 0)return null;/*** 指定到这里,说明堆顶数据的延迟时间大于0(表示延迟时间没到,堆顶数据还不能执行)且方法超时时间还没过* 消费者需要挂起等待*/first = null; // don't retain ref while waiting//方法剩余超时时间小于堆顶数据的延迟时间,则消费者线程继续阻塞,并返回剩余超时时间/*** todo 疑问:这里为什么不直接结束,反正最终是无法获取数据的?*           因为 你不确定在剩余的超时时间nanos内,是否有新的数据插入(新插入的数据可能延迟时间很短),*           前边offer(E e)新增数据后也会唤醒等待的消费者线程*           第二个条件 leader != null,leader != null表示前边已经有*           消费者线程在挂起阻塞堆顶数据的延迟时间到期,后边的消费者线程执行到这里*           需要直接阻塞挂起,这样避免 leader 的重复赋值*/if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);else {//方法剩余超时时间大于堆顶数据的延迟时间,表示当前消费者可以在超时时间nanos内拿到堆顶数据,// 且当前没有消费者在等待堆顶数据//将leader 设置为 当前消费者线程Thread thisThread = Thread.currentThread();leader = thisThread;try {//阻塞等待,并返回剩余的阻塞时间//当前消费者阻塞堆顶数据的延迟时间long timeLeft = available.awaitNanos(delay);//更新剩余的可阻塞时间,已消耗的超时时间是 delay - timeLeftnanos -= delay - timeLeft;} finally {//堆顶数据的延迟时间到了,将 leader 设置为null//这一步只有生产者和消费者自己可以做if (leader == thisThread)leader = null;}}}}} finally {//没有消费者线程在等待堆顶数据的延迟时间且堆不为空,此时可能还有消费者在阻塞,则唤醒这些阻塞的消费者线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

5.4、take()

         读取数据,允许中断,若队列为空则一直阻塞,直到队列有数据 或 被中断时异常退出

         take() 方法代码如下:     

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//加锁,允许中断,中断异常退出lock.lockInterruptibly();try {//自旋for (;;) {//查看堆顶数据E first = q.peek();//若队列没有数据,则阻塞,由生产者唤醒 或者被中断异常退出if (first == null)available.await();else {//队列不为空//获取堆顶数据的延迟时间long delay = first.getDelay(NANOSECONDS);//表示堆顶数据可以执行,则从队列中取出数据if (delay <= 0)return q.poll();//执行到这里,表示堆顶数据的延迟时间还没到,消费者线程需要阻塞,等待堆顶数据到达其延迟时间first = null; // don't retain ref while waiting//前边有消费者线程在阻塞等待堆顶数据的延迟时间到达,则当前线程直接阻塞if (leader != null)available.await();else {//当前没有消费者线程在等待堆顶数据的延迟时间到达,则把当前消费者设置为等待堆顶数据延迟时间到达的线程Thread thisThread = Thread.currentThread();leader = thisThread;try {//阻塞,阻塞时间是堆顶数据的延迟时间available.awaitNanos(delay);} finally {//阻塞结束,获取到堆顶数据后将 leader 设置为Nullif (leader == thisThread)leader = null;}}}}} finally {//没有消费者线程在等待堆顶数据的延迟时间且堆不为空,此时可能还有消费者在阻塞,则唤醒这些阻塞的消费者线程if (leader == null && q.peek() != null)available.signal();lock.unlock();}}

这篇关于JUC阻塞队列(四):DelayQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1094031

相关文章

Redis延迟队列的实现示例

《Redis延迟队列的实现示例》Redis延迟队列是一种使用Redis实现的消息队列,本文主要介绍了Redis延迟队列的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习... 目录一、什么是 Redis 延迟队列二、实现原理三、Java 代码示例四、注意事项五、使用 Redi

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用

POJ2010 贪心优先队列

c头牛,需要选n头(奇数);学校总共有f的资金, 每头牛分数score和学费cost,问合法招生方案中,中间分数(即排名第(n+1)/2)最高的是多少。 n头牛按照先score后cost从小到大排序; 枚举中间score的牛,  预处理左边与右边的最小花费和。 预处理直接优先队列贪心 public class Main {public static voi

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

springboot体会BIO(阻塞式IO)

使用springboot体会阻塞式IO 大致的思路为: 创建一个socket服务端,监听socket通道,并打印出socket通道中的内容。 创建两个socket客户端,向socket服务端写入消息。 1.创建服务端 public class RedisServer {public static void main(String[] args) throws IOException {

多路转接之select(fd_set介绍,参数详细介绍),实现非阻塞式网络通信

目录 多路转接之select 引入 介绍 fd_set 函数原型 nfds readfds / writefds / exceptfds readfds  总结  fd_set操作接口  timeout timevalue 结构体 传入值 返回值 代码 注意点 -- 调用函数 select的参数填充  获取新连接 注意点 -- 通信时的调用函数 添加新fd到

FreeRTOS学习笔记(六)队列

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、队列的基本内容1.1 队列的引入1.2 FreeRTOS 队列的功能与作用1.3 队列的结构体1.4 队列的使用流程 二、相关API详解2.1 xQueueCreate2.2 xQueueSend2.3 xQueueReceive2.4 xQueueSendFromISR2.5 xQueueRecei