DelayQueue介绍

2023-11-29 08:52
文章标签 介绍 delayqueue

本文主要是介绍DelayQueue介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

5.1 DelayQueue介绍&应用

DelayQueue就是一个延迟队列,生产者写入一个消息,这个消息还有直接被消费的延迟时间。
需要让消息具有延迟的特性。
DelayQueue也是基于二叉堆结构实现的,甚至本事就是基于PriorityQueue实现的功能。二叉堆结构每次获取的是栈顶的数据,需要让DelayQueue中的数据,在比较时,跟根据延迟时间做比较,剩余时间最短的要放在栈顶。查看DelayQueue类信息:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 发现DelayQueue中的元素,需要继承Delayed接口。
}
// ==========================================
// 接口继承了Comparable,这样就具备了比较的能力。
public interface Delayed extends Comparable<Delayed> {
// 抽象方法,就是咱们需要设置的延迟时间
long getDelay(TimeUnit unit);
// Comparable接口提供的:public int compareTo(T o);
}

基于上述特点,声明一个可以写入DelayQueue的元素类

public class Task implements Delayed {
/** 任务的名称 */
private String name;
/** 什么时间点执行 */
private Long time;
/**
*
* @param name
* @param delay 单位毫秒。
*/
public Task(String name, Long delay) {
// 任务名称
this.name = name;this.time = System.currentTimeMillis() + delay;
}
/**
* 设置任务什么时候可以出延迟队列
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 单位是毫秒,视频里写错了,写成了纳秒,
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 两个任务在插入到延迟队列时的比较方式
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.time - ((Task)o).getTime());
}
}

在使用时,查看到DelayQueue底层用了PriorityQueue,在一定程度上,DelayQueue也是无界队列。
测试效果

public static void main(String[] args) throws InterruptedException {
// 声明元素
Task task1 = new Task("A",1000L);
Task task2 = new Task("B",5000L);
Task task3 = new Task("C",3000L);
Task task4 = new Task("D",2000L);
// 声明阻塞队列
DelayQueue<Task> queue = new DelayQueue<>();
// 将元素添加到延迟队列中
queue.put(task1);
queue.put(task2);
queue.put(task3);
queue.put(task4);
// 获取元素
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
// A,D,C,B
}

在应用时,外卖,15分钟商家需要节点,如果不节点,这个订单自动取消。
可以每下一个订单,就放到延迟队列中,如果规定时间内,商家没有节点,直接通过消费者获取元素,然后取消订单。
只要是有需要延迟一定时间后,再执行的任务,就可以通过延迟队列去实现。

5.2、DelayQueue核心属性

可以查看到DelayQueue就四个核心属性

// 因为DelayQueue依然属于阻塞队列,需要保证线程安全。看到只有一把锁,生产者和消费者使用的是一个lock
private final transient ReentrantLock lock = new ReentrantLock();
// 因为DelayQueue还是基于二叉堆结构实现的,没有必要重新搞一个二叉堆,直接使用的PriorityQueue
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader一般会存储等待栈顶数据的消费者,在整体写入和消费的过程中,会设置的leader的一些判断。
private Thread leader = null;
// 生产者在插入数据时,不会阻塞的。当前的Condition就是给消费者用的
// 比如消费者在获取数据时,发现栈顶的数据还又没到延迟时间。
// 这个时候,咱们就需要将消费者线程挂起,阻塞一会,阻塞到元素到了延迟时间,或者是,生产者插入的元素到了栈顶,此时生产者会唤醒消费者。
private final Condition available = lock.newCondition();

5.3、DelayQueue写入流程分析

Delay是无界的,数组可以动态的扩容,不需要关注生产者的阻塞问题,他就没有阻塞问题。
这里只需要查看offer方法即可。

public boolean offer(E e) {
// 直接获取lock,加锁。
final ReentrantLock lock = this.lock;lock.lock();
try {
// 直接调用PriorityQueue的插入方法,这里会根据之前重写Delayed接口中的compareTo方法做排序,然后调整上移和下移操作。
q.offer(e);
// 调用优先级队列的peek方法,拿到堆顶的数据
// 拿到堆顶数据后,判断是否是刚刚插入的元素
if (q.peek() == e) {
// leader赋值为null。在消费者的位置再提一嘴
leader = null;
// 唤醒消费者,避免刚刚插入的数据的延迟时间出现问题。
available.signal();
}
// 插入成功,
return true;
} finally {
// 释放锁
lock.unlock();
}
}

5.4、DelayQueue读取流程分析

消费者依然还是存在阻塞的情况,因为有两个情况
● 消费者要拿到栈顶数据,但是延迟时间还没到,此时消费者需要等待一会。
● 消费者要来拿数据,但是发现已经有消费者在等待栈顶数据了,这个后来的消费者也需要等待一会。
依然需要查看四个方法的实现

5.4.1 remove方法

// 依然是AbstractQueue提供的方法,有结果就返回,没结果扔异常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

5.4.2 poll方法

// poll是浅尝一下,不会阻塞消费者,能拿就拿,拿不到就拉倒
public E poll() {
// 消费者和生产者是一把锁,先拿锁,加锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 拿到栈顶数据。
E first = q.peek();
// 如果元素为null,直接返回null
// 如果getDelay方法返回的结果是大于0的,那说明当前元素还每到延迟时间,元素无法返回,返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)return null;
else
// 到这说明元素不为null,并且已经达到了延迟时间,直接调用优先级队列的poll方法
return q.poll();
} finally {
// 释放锁。
lock.unlock();
}
}

5.4.3 poll(time,unit)方法

这个是允许阻塞的,并且指定一定的时间

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 先将时间转为纳秒
long nanos = unit.toNanos(timeout);
// 拿锁,加锁。
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 死循环。
for (;;) {
// 拿到堆顶数据
E first = q.peek();
// 如果元素为nullif (first == null) {
// 并且等待的时间小于等于0。不能等了,直接返回null
if (nanos <= 0)
return null;
// 说明当前线程还有可以阻塞的时间,阻塞指定时间即可。
else
// 这里挂起线程后,说明队列没有元素,在生产者添加数据之后,会唤醒
nanos = available.awaitNanos(nanos);
// 到这说明,有数据
} else {
// 有数据的话,先获取数据现在是否可以执行,延迟时间是否已经到了指定时间
long delay = first.getDelay(NANOSECONDS);
// 延迟时间是否已经到了,
if (delay <= 0)
// 时间到了,直接执行优先级队列的poll方法,返回元素
return q.poll();
// ==================延迟时间没到,消费者需要等一会===================
// 这个是查看消费者可以等待的时间,
if (nanos <= 0)
// 直接返回nulll
return null;
// ==================延迟时间没到,消费者可以等一会===================
// 把first赋值为null
first = null;
// 如果等待的时间,小于元素剩余的延迟时间,消费者直接挂起。反正暂时拿不到,但是不能保证后续是否有生产者添加一个新的数据,我是可以拿到的。
// 如果已经有一个消费者在等待堆顶数据了,我这边不做额外操作,直接挂起即可。
if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);
// 当前消费者的阻塞时间可以拿到数据,并且没有其他消费者在等待堆顶数据
else {
// 拿到当前消费者的线程对象
Thread thisThread = Thread.currentThread();
// 将leader设置为当前线程
leader = thisThread;
try {
// 会让当前消费者,阻塞这个元素的延迟时间
long timeLeft = available.awaitNanos(delay);
// 重新计算当前消费者剩余的可阻塞时间,。
nanos -= delay - timeLeft;
} finally {
// 到了时间,将leader设置为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 没有消费者在等待元素,队列中的元素不为null
if (leader == null && q.peek() != null)
// 只要当前没有leader在等,并且队列有元素,就需要再次唤醒消费者。、
// 避免队列有元素,但是没有消费者处理的问题
available.signal();
// 释放锁lock.unlock();
}
}

5.4.4 take方法

这个是允许阻塞的,但是可以一直等,要么等到元素,要么等到被中断。

public E take() throws InterruptedException {
// 正常加锁,并且允许中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 拿到元素
E first = q.peek();
if (first == null)
// 没有元素挂起。
available.await();
else {
// 有元素,获取延迟时间。
long delay = first.getDelay(NANOSECONDS);
// 判断延迟时间是不是已经到了
if (delay <= 0)
// 基于优先级队列的poll方法返回
return q.poll();first = null;
// 如果有消费者在等,就正常await挂起
if (leader != null)
available.await();
// 如果没有消费者在等的堆顶数据,我来等
else {
// 获取当前线程
Thread thisThread = Thread.currentThread();
// 设置为leader,代表等待堆顶的数据
leader = thisThread;
try {
// 等待指定(堆顶元素的延迟时间)时长,
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
// leader赋值null
leader = null;
}
}
}
}
} finally {
// 避免消费者无线等,来一个唤醒消费者的方法,一般是其他消费者拿到元素走了之后,并且延迟队列还有元素,就执行if内部唤醒方法
if (leader == null && q.peek() != null)
available.signal();
// 释放锁
lock.unlock();}
}

这篇关于DelayQueue介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/432205

相关文章

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}

C++——stack、queue的实现及deque的介绍

目录 1.stack与queue的实现 1.1stack的实现  1.2 queue的实现 2.重温vector、list、stack、queue的介绍 2.1 STL标准库中stack和queue的底层结构  3.deque的简单介绍 3.1为什么选择deque作为stack和queue的底层默认容器  3.2 STL中对stack与queue的模拟实现 ①stack模拟实现

Mysql BLOB类型介绍

BLOB类型的字段用于存储二进制数据 在MySQL中,BLOB类型,包括:TinyBlob、Blob、MediumBlob、LongBlob,这几个类型之间的唯一区别是在存储的大小不同。 TinyBlob 最大 255 Blob 最大 65K MediumBlob 最大 16M LongBlob 最大 4G

FreeRTOS-基本介绍和移植STM32

FreeRTOS-基本介绍和STM32移植 一、裸机开发和操作系统开发介绍二、任务调度和任务状态介绍2.1 任务调度2.1.1 抢占式调度2.1.2 时间片调度 2.2 任务状态 三、FreeRTOS源码和移植STM323.1 FreeRTOS源码3.2 FreeRTOS移植STM323.2.1 代码移植3.2.2 时钟中断配置 一、裸机开发和操作系统开发介绍 裸机:前后台系

nginx介绍及常用功能

什么是nginx nginx跟Apache一样,是一个web服务器(网站服务器),通过HTTP协议提供各种网络服务。 Apache:重量级的,不支持高并发的服务器。在Apache上运行数以万计的并发访问,会导致服务器消耗大量内存。操作系统对其进行进程或线程间的切换也消耗了大量的CPU资源,导致HTTP请求的平均响应速度降低。这些都决定了Apache不可能成为高性能WEB服务器  nginx:

多路转接之select(fd_set介绍,参数详细介绍),实现非阻塞式网络通信

目录 多路转接之select 引入 介绍 fd_set 函数原型 nfds readfds / writefds / exceptfds readfds  总结  fd_set操作接口  timeout timevalue 结构体 传入值 返回值 代码 注意点 -- 调用函数 select的参数填充  获取新连接 注意点 -- 通信时的调用函数 添加新fd到

火语言RPA流程组件介绍--浏览网页

🚩【组件功能】:浏览器打开指定网址或本地html文件 配置预览 配置说明 网址URL 支持T或# 默认FLOW输入项 输入需要打开的网址URL 超时时间 支持T或# 打开网页超时时间 执行后后等待时间(ms) 支持T或# 当前组件执行完成后继续等待的时间 UserAgent 支持T或# User Agent中文名为用户代理,简称 UA,它是一个特殊字符串头,使得服务器