并发编程-延时队列DelayQueue

2023-10-21 07:28

本文主要是介绍并发编程-延时队列DelayQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

数据结构学习网站:

Data Structure Visualization

思维导图

      

DelayQueue (延时队列)

          DelayQueue 是一个支持延时获取元素的阻塞队列 , 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。
        延迟队列的特点是: 不是先进先出,而是会按照延迟时间的 长短来排序,下一个即将执行的任务会排到队列的最前面。
它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接
口,所以自然就拥有了比较和排序的能力,代码如下:
public interface Delayed extends Comparable<Delayed> {//getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,//如果返回 0 或者负数则代表任务已过期。//元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。long getDelay(TimeUnit unit);}

DelayQueue使用

DelayQueue 实现延迟订单
在实现一个延迟订单的场景中,我们可以定义一个 Order 类,其中包含订单的基本信息,例如订单编 号、订单金额、订单创建时间等。同时,我们可以让 Order 类实现 Delayed 接口,重写 getDelay 和 compareTo 方法。在 getDelay 方法中,我们可以计算订单的剩余延迟时间,而在 compareTo 方法 中,我们可以根据订单的延迟时间进行比较。
下面是一个简单的示例代码,演示了如何使用 DelayQueue 来实现一个延迟订单的场景:
public class DelayQueueExample {public static void main(String[] args) throws InterruptedException {DelayQueue<Order> delayQueue = new DelayQueue<>();// 添加三个订单,分别延迟 5 秒、2 秒和 3 秒delayQueue.put(new Order("order1", System.currentTimeMillis(), 5000));delayQueue.put(new Order("order2", System.currentTimeMillis(), 2000));delayQueue.put(new Order("order3", System.currentTimeMillis(), 3000));// 循环取出订单,直到所有订单都被处理完毕while (!delayQueue.isEmpty()) {Order order = delayQueue.take();System.out.println("处理订单:" + order.getOrderId());}}static class  Order implements Delayed{private String orderId;private long createTime;private long delayTime;public Order(String orderId, long createTime, long delayTime) {this.orderId = orderId;this.createTime = createTime;this.delayTime = delayTime;}public String getOrderId() {return orderId;}@Overridepublic long getDelay(TimeUnit unit) {long diff = createTime + delayTime - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {long diff = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);return Long.compare(diff, 0);}}
}
        由于每个订单都有不同的延迟时间,因此它们将会按照延迟时间的顺序被取出。当延迟时间到达时, 对应的订单对象将会被从队列中取出,并被处理。

DelayQueue原理

数据结构
 //用于保证队列操作的线程安全private final transient ReentrantLock lock = new ReentrantLock();// 优先级队列,存储元素,用于保证延迟低的优先执行private final PriorityQueue<E> q = new PriorityQueue<E>();// 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程private Thread leader = null;// 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知private final Condition available = lock.newCondition();public DelayQueue() {}public DelayQueue(Collection<? extends E> c) {this.addAll(c);}

入队put方法

    public void put(E e) {offer(e);}public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {// 入队q.offer(e);if (q.peek() == e) {// 若入队的元素位于队列头部,说明当前元素延迟最小// 将 leader 置空leader = null;// available条件队列转同步队列,准备唤醒阻塞在available上的线程available.signal();}return true;} finally {lock.unlock(); // 解锁,真正唤醒阻塞的线程}}

出队take方法

 public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。else {long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素return q.poll();// 如果delay大于0 ,则下面要阻塞了// 将first置为空方便gcfirst = null;// 如果有线程争抢的Leader线程,则进行无限期等待。if (leader != null)available.await();else {// 如果leader为null,把当前线程赋值给它Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待剩余等待时间available.awaitNanos(delay);} finally {// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素if (leader == thisThread)leader = null;}}}}} finally {// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程if (leader == null && q.peek() != null)// available条件队列转同步队列,准备唤醒阻塞在available上的线程available.signal();// 解锁,真正唤醒阻塞的线程lock.unlock();}}
1. 当获取元素时,先获取到锁对象。
2. 获取最早过期的元素,但是并不从队列中弹出元素。
3. 最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。
4. 如果最早过期的元素不为空
5. 获取最早过期元素的剩余过期时间,如果已经过期则直接返回当前元素
6. 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,则当前线程进 行无限期等待,如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间。
7. 最后将Leader线程设置为空
8. 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。

如何选择适合的阻塞队列

 选择策略

通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:
功能
        第 1 个需要考虑的就是 功能层面 ,比如是否需要阻塞队列帮我们排序,如 优先级排序、延迟执行 等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队 列。
容量
        第 2 个需要考虑的是 容量 ,或者说 是否有存储的要求 ,还是只需要“直接传递”。在考虑这一点
的时候,我们知道前面介绍的那几种阻塞队列,有的是 容量固定的,如 ArrayBlockingQueue ;有的 默认是 容量无限的,如 LinkedBlockingQueue ;而有的里面 没有任何容量,如
SynchronousQueue ;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE 。所以不同阻塞队列的容量是千差万别的, 我们需要根据任务数量来推算出合适的容量 ,从而去选取合适的 BlockingQueue。
能否扩容
        第 3 个需要考虑的是 能否扩容 。因为有时我们并不能在初始的时候很好的准确估计队列的大小, 因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue , 因为它的容量在创建时就确定了,无法扩容。相反 PriorityBlockingQueue 即使在指定了初始容量 之后,后续如果有需要,也可以自动扩容 。所以 我们可以根据是否需要扩容来选取合适的队列。
内存结构
        第 4 个需要 考虑的点就是内存结构 。我们分析过 ArrayBlockingQueue 的源码,看到了它的内部 结构是“数组” 的形式。和它不同的是, LinkedBlockingQueue 的内部是用链表 实现的,所以这里就需要我们考虑到, ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高 。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
性能
        第 5 点就是 从性能的角度去考虑 。比如 LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细 在并发程度高的时候,相对于 只有一把锁的 ArrayBlockingQueue 性能会更好 。另外, SynchronousQueue 性能往往优于其他实现 ,因为 它只需要“直接传递” ,而不需要存储的过程。 如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue

 线程池对于阻塞队列的选择

 线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
Executors 类下的线程池类型:
FixedThreadPool (SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue
CachedThreadPool 选取的是 SynchronousQueue
ScheduledThreadPool SingleThreadScheduledExecutor同理)选取的是延迟队列

这篇关于并发编程-延时队列DelayQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/252845

相关文章

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

高并发环境中保持幂等性

在高并发环境中保持幂等性是一项重要的挑战。幂等性指的是无论操作执行多少次,其效果都是相同的。确保操作的幂等性可以避免重复执行带来的副作用。以下是一些保持幂等性的常用方法: 唯一标识符: 请求唯一标识:在每次请求中引入唯一标识符(如 UUID 或者生成的唯一 ID),在处理请求时,系统可以检查这个标识符是否已经处理过,如果是,则忽略重复请求。幂等键(Idempotency Key):客户端在每次

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用