源码分析-PriorityBlockingQueue

2024-06-10 18:18

本文主要是介绍源码分析-PriorityBlockingQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

PriorityBLockingQueue-文档部分

doc文档

PriorityBlockingQueue是无界的阻塞队列。当然如果资源耗尽的看情况下也是会出现添加失败的情况。PriorityBlockingQueue提供的迭代器并不保证按照某一顺序顺序迭代所有元素(和有限队列一致,可以见另一篇博客源码分析-PriorityQueue)(它会先按数组迭代,然后再迭代漏掉的元素)。当两个元素相等的时候并不保证陷入先出顺序。所以如果需要保证先入先出顺序必须要保证

源码实现文档

这里使用了一个给予数组的堆实现,所有公共方法都知识用一个锁实现,但是在调整大小的阶段使用了一个自旋锁,以避免并发申请调整大小。这样可以避免等待的消费者的推迟,以及随之而来的元素重建问题。由于需要在申请新内存的过程中退出锁,所以不可以将任务简单的委托给一个PriorityQueue来完成。为了保证兼容性,在序列话的过程中使用了锁。为了增加保证兼容性。在序列化的过程中需要付出双倍的代价(先转换成一个PriorityQueue然后在序列化)

概述

大部分的内容都和之前的PriorityQueue类似。与PriorityQueue的区别主要体现在两点上,一个是在出队和入队的过程中增加了BlockingQueue的操作。另外一个在resize 的过程中使用了自旋锁。

    private static final int DEFAULT_INITIAL_CAPACITY = 11;private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;private transient Object[] queue;private transient int size;private transient Comparator<? super E> comparator;private final ReentrantLock lock;private final Condition notEmpty;private transient volatile int allocationSpinLock;private PriorityQueue q;

域的内容比较清楚,只做几点说明

首先一个MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;为什么要限制位Integer.MAX_VALUE-8,为什么要限制为-8。网上有人解释说这是因为有一部分虚拟机的实现是需要使用8位来储存size。在很多地方都出现了,这里做一下解释。
lock锁及其绑定的Condition是用来做阻塞操作的。因为这里是一个无界的队列所以这里只有一个状态变量。用于阻塞take。
而allocationSpinkLock是一个volatile的变量,用这个结合cas来进行自旋。

主要方法

构造器

构造器有很多,这里只看一个使用collection进行构造的构造器

    public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true;  // true if must screen for nullsif (c instanceof SortedSet<?>) {SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;}else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify();}

这里使用两个boolean值来判断是否满足条件。
heapify表示是否需要进行构造堆。而screen表示是否需要排除null。
然后对这依照这两个变量分情况讨论。如果是sortSet类型,说明有序,则提取其comparator。并使heapify置位(后续不需要构造堆,因为递增序列是堆的一种特殊情况),如果是Priority自身拷贝当然两者都置为false表示都不需要进行。
然后使用Collection的toArray方法转换成Object[]并进行类型检查。并对各项参数进行复制。
heapify方法和之前PriorityQueue的是一样的,对数组的前半部分元素进行下滤操作。

阻塞方法的实现

非阻塞的put方法

BlockingQueue方法的实现都大同小异这里看一下put和take的实现:

  public void put(E e) {offer(e); // never need to block}

首先由于这是无界队列,所以不需要阻塞,只是讲任务委托给了offer来实现。

    public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}

因为这里没有阻塞,所以这里的锁完全是来进行同步的,不需要条件变量。
加锁后进行大小检查,如果需要增长则调用tryGrow。
否则则将新进入的元素放入队尾,然后进行上移操作。并唤醒notEmpty条件变量。

膨胀及自旋锁

        private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // must release and then re-acquire main lockObject[] newArray = null;if (allocationSpinLock == 0 &&UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflowint minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) // back off if another thread is allocatingThread.yield();lock.lock();if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}

首先退出锁。
首先这里申请了一个新的引用newArray。然后使用CAS自旋操作使得volatile的allocationSplinLock变成1。
对于自旋成功的线程,会计算出一个新的堆的尺寸,然后建立一个新的数组,并且在最后使得allocationSplinLock变回0.
对于自旋失败的数组,会让步,这里很好理解,因为下一步就是阻塞,所以即使这里争取到了线程也没有太大的意义。因为其newArray是空,而后退出tryGrow,但是该线程的增长并没有成功所以在offer的循环检查中(n = size) >= (cap = (array = queue).length)这里会失败所以需要重新进入tryGrow。这样做没有意义。
而yield会使得成功创建新数组的线程更容易获得锁,然后会使得queue指向新建的数组,这样对于自旋失败的线程通常来说会在自旋成功的线程之后获得锁,这样在offer中的自旋检测中成功通过,这样就避免了一些重复的操作。以提供更高的效率。
这里还需要说明的是yield,并不是100%会让出线程的,这里的yield只是让这个概率更大一些。

阻塞的take方法

    public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}

take方法则是常规的阻塞操作会重复检查notEmpty变量。
deque是典型的出队操作,类似PriorityQueue。

这篇关于源码分析-PriorityBlockingQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1048904

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。