Java 源码 - DelayQueue 源码解析

2024-04-30 14:28
文章标签 java 源码 解析 delayqueue

本文主要是介绍Java 源码 - DelayQueue 源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

        • 1. 整体设计
          • 1.1 类注释
          • 1.2、类图
          • 1.3 延迟队列的属性
          • 1.4 DelayQueue 的主要方法
            • 1.4.1 offer 添加元素
            • 1.4.2 take 取出元素
            • 1.4.3 poll 取出元素

1. 整体设计

DelayQueue 延迟队列底层使用的是锁的能力,比如说要在当前时间往后延迟 5 秒执行,那么当前线程就会沉睡 5 秒,等 5 秒后线程被唤醒时,如果能获取到资源的话,线程即可立马执行。原理上似乎很简单,但内部实现却很复杂,有很多难点,比如当运行资源不够,多个线程同时被唤醒时,如何排队等待?比如说在何时阻塞?何时开始执行等等?接下来我们从源码角度来看下是如何实现的。

1.1 类注释

类注释上比较简单,只说了三个概念:

  1. 无界延时队列中元素将在过期时被执行,越靠近队头,越早过期;
  2. 未过期的元素不能够被 take;
  3. 不允许空元素。
1.2、类图

DelayQueue 的类图,关键是 DelayQueue 类上是有泛型的,如下:
image.png

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>implements BlockingQueue<E> {

从泛型中可以看出,DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法,如下:

public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);
}

也就是说 DelayQueue 队列中的元素必须是实现 Delayed 接口和 Comparable 接口的,并覆写了 getDelay 方法和 compareTo 的方法才行,不然在编译时,编译器就会提醒我们元素必须强制实现 Delayed 接口。
compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素(注意是查看不是取出),然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。

1.3 延迟队列的属性

DelayQueue 中的重要属性如下所示。

// 可重入锁,用于保证线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// DelayQueue 的实现依赖于 PriorityQueue(优先队列),用于存储元素,并按过期时间优先排序
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化内部阻塞通知的线程
// 第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能
private Thread leader = null;
// 用于实现阻塞的 Condition 对象
private final Condition available = lock.newCondition();

DelayQueue 内部使用非线程安全的优先队列(PriorityQueue),并使用 Leader-Followers (领导者-追随者)模式,最小化不必要的等待时间。什么是领导者-追随者模式.

1.4 DelayQueue 的主要方法
1.4.1 offer 添加元素
public boolean offer(E e) {// 获取全局独占锁final ReentrantLock lock = this.lock;lock.lock();try {// 向优先队列中插入元素q.offer(e);// 检验元素是否为队首,是则设置 leader 为 null, 并唤醒一个消费线程 if (q.peek() == e) {leader = null;available.signal();}return true;} finally {// 释放全局独占锁lock.unlock();}}
leader 是等待获取队头元素的线程,领导者-追随者模式设计减少不必要的等待。
如果 leader != null,表示已经有线程在等待获取队头元素,会通过 await() 方法让出当前线程等待信号。
如果 leader == null,则把当前线程设置为 leader,当一个线程为 leader 时,会使用 awaitNanos() 让当前线程等待接受信号,或等待 delay 时间。

DelayQueue 的其他入队方法,如 add(E e) 和 put(E e) 方法,都是调用上述 offer(E e) 方法实现的。

1.4.2 take 取出元素

take() 方法取出队列元素,当没有元素被取出时,该方法阻塞。
一开始看到全局独占锁,理所当然详情属于队列消费模式。 无法理解 “领导者-追随者模式”。take方法实现了一个“领导者-追随者模式”的线程处理方式,只有leader线程会等待指定时间后获得锁,其他线程都会进入无限期等待。 如果多个线程调用take() 方法, 当available.awaitNanos(delay);的时候, 其它线程可以抢锁进入。 下面有测试例子。源码中:java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject.await()和await(long time, TimeUnit unit); 方法 Node node = addConditionWaiter(); long savedState = fullyRelease(node); 队列状态释放
take方法主要实现逻辑为(for循环体):
1. 获取头节点对象,如果为空,线程释放锁,并进入无限期等待。等待调用offer方法,放入对象后,通过signal()方法唤醒。【看offer方法的源码】
2. 如果头节点对象不为空,获取该对象的延迟时间,如果小于0,直接从队列中取出并移除该对象,返回。
3. 如果头节点对象延迟时间大于0,判断是否“leader线程”是否已经存在,如果存在说明当前线程为“追随者线程”,进入无限期等待(等待leader线程take方法完成后,唤醒)。
4. 如果“leader线程”不存在,把当前线程设置为“leader线程”,释放锁并等待头节点对象的延迟时间后,重新获得锁,下次循环获取头节点对象返回。
5. finally代码块,每次leader线程执行完成take方法后,需要唤醒其他线程获得锁成为新的leader线程。

public E take() throws InterruptedException {// 获取全局独占锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 获取队头元素,peek 方法不会删除元素E first = q.peek();if (first == null)// 若队头为空,则阻塞当前线程available.await();else {// 否则获取队头元素的超时时间long delay = first.getDelay(NANOSECONDS);// 已超时,直接出队if (delay <= 0)return q.poll();// 释放 first 的引用,避免内存泄漏first = null; // don't retain ref while waiting// leader != null 表明有其他线程在操作,阻塞当前线程if (leader != null)available.await();else {// leader 指向当前线程Thread thisThread = Thread.currentThread();leader = thisThread;try {// 超时阻塞available.awaitNanos(delay);} finally {// 释放 leaderif (leader == thisThread)leader = null;}}}}} finally {// leader 为 null 并且队列不为空,说明没有其他线程在等待,那就通知条件队列if (leader == null && q.peek() != null)available.signal();// 释放全局独占锁    lock.unlock();}
}

Condition.await() 和Condition.await(100, TimeUnit.SECONDS); 方法进入等待时候,其它线程可以抢抢到锁

package com.lvyuanj.test.timer;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
public class TestConditionAwait {private static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {final Condition condition = lock.newCondition();final Thread thread1 = new Thread(new Runnable() {@Overridepublic void run() {lock.lock();try {Thread.currentThread().setName("ConditionAwait");log.error(Thread.currentThread().getName() + " beforeAwaitTime:" + System.currentTimeMillis());condition.await(100, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error(Thread.currentThread().getName() + " finishAwaitTime:" + System.currentTimeMillis());} finally {lock.unlock();log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());}}});Thread thread2 = new Thread(new Runnable() {@Overridepublic void run() {Thread.currentThread().setName("ConditionSignal");try {lock.lock();log.error(Thread.currentThread().getName() + " getLockTime:" + System.currentTimeMillis());//thread1.interrupt();long currentTime = System.currentTimeMillis();while (System.currentTimeMillis() - currentTime < 8000) {}condition.signal();log.error(Thread.currentThread().getName() + " signalTime:" + System.currentTimeMillis());} catch (Exception e) {} finally {lock.unlock();log.error(Thread.currentThread().getName() + " unlockTime:" + System.currentTimeMillis());}}});thread1.start();Thread.sleep(50);thread2.start();}
}
1.4.3 poll 取出元素

取出队头元素,当延迟队列中没有到期的元素可以取出时,返回 null。

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}
}

这篇关于Java 源码 - DelayQueue 源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949109

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听