源码分析-PriorityBlockingQueue

2024-06-10 18:18

本文主要是介绍源码分析-PriorityBlockingQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

PriorityBLockingQueue-文档部分

doc文档

PriorityBlockingQueue是无界的阻塞队列。当然如果资源耗尽的看情况下也是会出现添加失败的情况。PriorityBlockingQueue提供的迭代器并不保证按照某一顺序顺序迭代所有元素(和有限队列一致,可以见另一篇博客源码分析-PriorityQueue)(它会先按数组迭代,然后再迭代漏掉的元素)。当两个元素相等的时候并不保证陷入先出顺序。所以如果需要保证先入先出顺序必须要保证

源码实现文档

这里使用了一个给予数组的堆实现,所有公共方法都知识用一个锁实现,但是在调整大小的阶段使用了一个自旋锁,以避免并发申请调整大小。这样可以避免等待的消费者的推迟,以及随之而来的元素重建问题。由于需要在申请新内存的过程中退出锁,所以不可以将任务简单的委托给一个PriorityQueue来完成。为了保证兼容性,在序列话的过程中使用了锁。为了增加保证兼容性。在序列化的过程中需要付出双倍的代价(先转换成一个PriorityQueue然后在序列化)

概述

大部分的内容都和之前的PriorityQueue类似。与PriorityQueue的区别主要体现在两点上,一个是在出队和入队的过程中增加了BlockingQueue的操作。另外一个在resize 的过程中使用了自旋锁。

    private static final int DEFAULT_INITIAL_CAPACITY = 11;private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;private transient Object[] queue;private transient int size;private transient Comparator<? super E> comparator;private final ReentrantLock lock;private final Condition notEmpty;private transient volatile int allocationSpinLock;private PriorityQueue q;

域的内容比较清楚,只做几点说明

首先一个MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;为什么要限制位Integer.MAX_VALUE-8,为什么要限制为-8。网上有人解释说这是因为有一部分虚拟机的实现是需要使用8位来储存size。在很多地方都出现了,这里做一下解释。
lock锁及其绑定的Condition是用来做阻塞操作的。因为这里是一个无界的队列所以这里只有一个状态变量。用于阻塞take。
而allocationSpinkLock是一个volatile的变量,用这个结合cas来进行自旋。

主要方法

构造器

构造器有很多,这里只看一个使用collection进行构造的构造器

    public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true;  // true if must screen for nullsif (c instanceof SortedSet<?>) {SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;}else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify();}

这里使用两个boolean值来判断是否满足条件。
heapify表示是否需要进行构造堆。而screen表示是否需要排除null。
然后对这依照这两个变量分情况讨论。如果是sortSet类型,说明有序,则提取其comparator。并使heapify置位(后续不需要构造堆,因为递增序列是堆的一种特殊情况),如果是Priority自身拷贝当然两者都置为false表示都不需要进行。
然后使用Collection的toArray方法转换成Object[]并进行类型检查。并对各项参数进行复制。
heapify方法和之前PriorityQueue的是一样的,对数组的前半部分元素进行下滤操作。

阻塞方法的实现

非阻塞的put方法

BlockingQueue方法的实现都大同小异这里看一下put和take的实现:

  public void put(E e) {offer(e); // never need to block}

首先由于这是无界队列,所以不需要阻塞,只是讲任务委托给了offer来实现。

    public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}

因为这里没有阻塞,所以这里的锁完全是来进行同步的,不需要条件变量。
加锁后进行大小检查,如果需要增长则调用tryGrow。
否则则将新进入的元素放入队尾,然后进行上移操作。并唤醒notEmpty条件变量。

膨胀及自旋锁

        private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}

首先退出锁。
首先这里申请了一个新的引用newArray。然后使用CAS自旋操作使得volatile的allocationSplinLock变成1。
对于自旋成功的线程,会计算出一个新的堆的尺寸,然后建立一个新的数组,并且在最后使得allocationSplinLock变回0.
对于自旋失败的数组,会让步,这里很好理解,因为下一步就是阻塞,所以即使这里争取到了线程也没有太大的意义。因为其newArray是空,而后退出tryGrow,但是该线程的增长并没有成功所以在offer的循环检查中(n = size) >= (cap = (array = queue).length)这里会失败所以需要重新进入tryGrow。这样做没有意义。
而yield会使得成功创建新数组的线程更容易获得锁,然后会使得queue指向新建的数组,这样对于自旋失败的线程通常来说会在自旋成功的线程之后获得锁,这样在offer中的自旋检测中成功通过,这样就避免了一些重复的操作。以提供更高的效率。
这里还需要说明的是yield,并不是100%会让出线程的,这里的yield只是让这个概率更大一些。

阻塞的take方法

    public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}

take方法则是常规的阻塞操作会重复检查notEmpty变量。
deque是典型的出队操作,类似PriorityQueue。

这篇关于源码分析-PriorityBlockingQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1048904

相关文章

Spring Boot Interceptor的原理、配置、顺序控制及与Filter的关键区别对比分析

《SpringBootInterceptor的原理、配置、顺序控制及与Filter的关键区别对比分析》本文主要介绍了SpringBoot中的拦截器(Interceptor)及其与过滤器(Filt... 目录前言一、核心功能二、拦截器的实现2.1 定义自定义拦截器2.2 注册拦截器三、多拦截器的执行顺序四、过

C++ scoped_ptr 和 unique_ptr对比分析

《C++scoped_ptr和unique_ptr对比分析》本文介绍了C++中的`scoped_ptr`和`unique_ptr`,详细比较了它们的特性、使用场景以及现代C++推荐的使用`uni... 目录1. scoped_ptr基本特性主要特点2. unique_ptr基本用法3. 主要区别对比4. u

Nginx内置变量应用场景分析

《Nginx内置变量应用场景分析》Nginx内置变量速查表,涵盖请求URI、客户端信息、服务器信息、文件路径、响应与性能等类别,这篇文章给大家介绍Nginx内置变量应用场景分析,感兴趣的朋友跟随小编一... 目录1. Nginx 内置变量速查表2. 核心变量详解与应用场景3. 实际应用举例4. 注意事项Ng

Java多种文件复制方式以及效率对比分析

《Java多种文件复制方式以及效率对比分析》本文总结了Java复制文件的多种方式,包括传统的字节流、字符流、NIO系列、第三方包中的FileUtils等,并提供了不同方式的效率比较,同时,还介绍了遍历... 目录1 背景2 概述3 遍历3.1listFiles()3.2list()3.3org.codeha

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe