并发队列之DelayQueue

2023-12-21 11:08
文章标签 并发 队列 delayqueue

本文主要是介绍并发队列之DelayQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  已经说了四个并发队列了,DelayQueue这是最后一个,这是一个无界阻塞延迟队列,底层基于前面说过的PriorityBlockingQueue实现的 ,队列中每个元素都有过期时间,当从队列获取元素时,只有过期元素才会出队列,而队列头部的元素是过期最快的元素;

一.简单使用

  可以看到我们可以自己设置超时时间和优先级队列中的比较规则,这样我们在队列中取的时候,按照最快超时的先出队;

package com.example.demo.study;import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;import lombok.Data;public class Study0210 {@Datastatic class MyDelayed implements Delayed {private long delayTime;//该任务需要再队列中的延迟的时候private long expire;//这个时间表示当前时间和延迟时间相加,这里就叫做到期时间private String taskName;//任务的名称public MyDelayed(long delayTime, String taskName) {this.delayTime = delayTime;this.taskName = taskName;this.expire = System.currentTimeMillis()+delayTime;}//指定优先级队列里面的比较规则,就跟上篇博客中说的优先级队列中说的比较器一样
        @Overridepublic int compareTo(Delayed o) {return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));}//这个方法表示该任务在队列中还有多少剩余时间,也就是expire-当前时间
        @Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire-System.currentTimeMillis(), TimeUnit.MILLISECONDS);}}public static void main(String[] args) throws InterruptedException {//创建延迟队列DelayQueue<MyDelayed> queue = new DelayQueue<MyDelayed>();//创建任务丢到队列中Random random = new Random();for (int i = 1; i < 11; i++) {MyDelayed myDelayed = new MyDelayed(random.nextInt(500),"task"+i);queue.add(myDelayed);}//获取队列中的任务,这里只会跟超时时间最小的有关,和入队顺序无关MyDelayed myDelayed = queue.take();while(myDelayed!=null) {System.out.println(myDelayed.toString());myDelayed = queue.take();}}
}

二.基本组成 

//由此可是这个队列中存放的任务必须是Delayed类型的
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {//独占锁private final transient ReentrantLock lock = new ReentrantLock();//优先级队列private final PriorityQueue<E> q = new PriorityQueue<E>();//leader线程,实际上每次进行入队和出队操作的只能是leader线程,其余的都叫做fallower线程,这里会用到一个leader-follower模式private Thread leader = null;//条件变量private final Condition available = lock.newCondition();//省略很多代码
}

  具体的继承关系可以看看下面这个图,实际操作的都是内部的PriorityQueue;

三.offer方法

  上面代码中我们虽然说调用的是add方法,其实就是调用的是offer方法;

public boolean offer(E e) {final ReentrantLock lock = this.lock;//获取锁
    lock.lock();try {//往优先级队列中添加一个元素
        q.offer(e);//注意,peek方法只是获取优先级队列中第一个元素,并不会删除//如果优先级队列中取的元素就是和当前添加的元素一样,说明当前元素就是达到过期要求的,于是设置leader线程为null//然后通知条件队列中的线程优先级队列中已经有元素了,可以过来取了if (q.peek() == e) {leader = null;available.signal();}return true;} finally {//释放锁
        lock.unlock();}
}

四.take方法

  获取并移除队列中达到超时时间要求的元素,如果队列中没有元素,就把当前线程丢到条件队列中阻塞;

  从下面的代码逻辑中我们可以知道:线程分为两种,一种是leader线程,一种是follower线程,其中leader线程只会阻塞一定的时间,follower线程会在条件队列阻塞无限长的时间;当leader线程执行完take操作之后,就会重置leader线程为null,然后从条件队列中拿一个出来设置为leader线程

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//获取锁,可中断
    lock.lockInterruptibly();try {for (;;) {//这里先是尝试从优先级队列中获取一下节点,获取不到的话,说明当前优先级队列为空,就阻塞当前线程E first = q.peek();if (first == null)available.await();else {//如果优先级队列中有元素,那么肯定能走到这里来,然后取到该元素的超时时间,如果小于0,说明已经达到要求了,可以获取并删除队列中的元素long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return q.poll();first = null; // don't retain ref while waiting//如果leader队列不为空,说明有其他线程正在执行take,于是就把当前线程放到条件队列中if (leader != null)available.await();//到这里,说明优先级队列中没有元素到超时时间,而且此时没有其他线程调用take方法,于是就把leader线程设置为当前线程,//然后当前leader线程就会等待一定的时间,等优先级队列中最快超时的元素;//在等待的时候,leader线程会释放锁,这时其他线程B可以调用offer方法添加元素,线程C也可以调用take方法,然后线程C就会在//上面这里阻塞无限长的时间,直到被唤醒else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {//当前线程阻塞一定时间之后,不管成功了没有,都会把leader线程重置为null,然后重新循环if (leader == thisThread)leader = null;}}}}//这里的意思就是当前线程移除元素成功之后,唤醒条件队列中的线程去继续从队列中获取元素} finally {if (leader == null && q.peek() != null)available.signal();//释放锁
        lock.unlock();}
}

五.poll操作

  获取并移除队头过期元素,如果队列为空,或者对头元素没有过超时时间就返回null

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {//尝试获取队头元素,如果队头元素为空或者该延迟过期时间还没到,就返回nullE first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;else//否则就获取并移除队头元素return q.poll();} finally {lock.unlock();}
}

六.总结

  这个队列其实很容易,主要的是有一个延迟时间,我们从优先级队列中获取的根节点首先会判断有没有过超时时间,有的话就移除并返回就好了,没有的话,就看看还剩下多少时间才会超时(由于是优先级队列,所以根节点一般就是最快超时时间的,当然,也可以修改优先级队列的比较规则),于是当前线程就会等这个节点超时,此时leader等于当前线程,在等待的过程中,会释放锁,所以其他线程可以往队列中添加元素,也可以获取元素(但是由于此时leader!=null,这些线程会阻塞无限长时间直到被唤醒);

  在leader线程超时时间到了之后自动唤醒,再进行一次循环,就会获取并移除根节点了,最后再重置leader节点为null,顺便唤醒条件队列中的节点;

这篇关于并发队列之DelayQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/519747

相关文章

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

高并发环境中保持幂等性

在高并发环境中保持幂等性是一项重要的挑战。幂等性指的是无论操作执行多少次,其效果都是相同的。确保操作的幂等性可以避免重复执行带来的副作用。以下是一些保持幂等性的常用方法: 唯一标识符: 请求唯一标识:在每次请求中引入唯一标识符(如 UUID 或者生成的唯一 ID),在处理请求时,系统可以检查这个标识符是否已经处理过,如果是,则忽略重复请求。幂等键(Idempotency Key):客户端在每次

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用

POJ2010 贪心优先队列

c头牛,需要选n头(奇数);学校总共有f的资金, 每头牛分数score和学费cost,问合法招生方案中,中间分数(即排名第(n+1)/2)最高的是多少。 n头牛按照先score后cost从小到大排序; 枚举中间score的牛,  预处理左边与右边的最小花费和。 预处理直接优先队列贪心 public class Main {public static voi

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

java线程深度解析(五)——并发模型(生产者-消费者)

http://blog.csdn.net/Daybreak1209/article/details/51378055 三、生产者-消费者模式     在经典的多线程模式中,生产者-消费者为多线程间协作提供了良好的解决方案。基本原理是两类线程,即若干个生产者和若干个消费者,生产者负责提交用户请求任务(到内存缓冲区),消费者线程负责处理任务(从内存缓冲区中取任务进行处理),两类线程之

java线程深度解析(四)——并发模型(Master-Worker)

http://blog.csdn.net/daybreak1209/article/details/51372929 二、Master-worker ——分而治之      Master-worker常用的并行模式之一,核心思想是由两个进程协作工作,master负责接收和分配任务,worker负责处理任务,并把处理结果返回给Master进程,由Master进行汇总,返回给客

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理 秒杀系统是应对高并发、高压力下的典型业务场景,涉及到并发控制、库存管理、事务管理等多个关键技术点。本文将深入剖析秒杀商品业务中常见的几个核心问题,包括 AOP 事务管理、同步锁机制、乐观锁、CAS 操作,以及用户限购策略。通过这些技术的结合,确保秒杀系统在高并发场景下的稳定性和一致性。 1. AOP 代理对象与事务管理 在秒杀商品