Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析

本文主要是介绍Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • LinkedBlockingQueue概述
    • 类图结构及重要字段
    • 构造器
    • 出队和入队操作
      • 入队enqueue
      • 出队dequeue
    • 阻塞式操作
      • E take() 阻塞式获取
      • void put(E e) 阻塞式插入
      • E poll(timeout, unit) 阻塞式超时获取
      • boolean offer(e, timeout, unit) 阻塞式超时插入
    • 其他常规操作
      • boolean offer(E e)
      • E poll()
      • E peek()
      • Boolean remove(Object o)
    • 总结
    • 参考阅读

系列传送门:

  • Java并发包源码学习系列:AbstractQueuedSynchronizer
  • Java并发包源码学习系列:CLH同步队列及同步资源获取与释放
  • Java并发包源码学习系列:AQS共享式与独占式获取与释放资源的区别
  • Java并发包源码学习系列:ReentrantLock可重入独占锁详解
  • Java并发包源码学习系列:ReentrantReadWriteLock读写锁解析
  • Java并发包源码学习系列:详解Condition条件队列、signal和await
  • Java并发包源码学习系列:挂起与唤醒线程LockSupport工具类
  • Java并发包源码学习系列:JDK1.8的ConcurrentHashMap源码解析
  • Java并发包源码学习系列:阻塞队列BlockingQueue及实现原理分析
  • Java并发包源码学习系列:阻塞队列实现之ArrayBlockingQueue源码解析

LinkedBlockingQueue概述

LinkedBlockingQueue是由单链表构成的界限可选的阻塞队列,如不指定边界,则为Integer.MAX_VALUE,因此如不指定边界,一般来说,插入的时候都会成功。

LinkedBlockingQueue支持FIFO先进先出的次序对元素进行排序。

类图结构及重要字段

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;// 单链表节点static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}/** 容量,如果不指定就是Integer.MAX_VALUE */private final int capacity;/** 原子变量,记录元素个数 */private final AtomicInteger count = new AtomicInteger();/*** 哨兵头节点,head.next才是队列的第一个元素*/transient Node<E> head;/*** 指向最后一个元素*/private transient Node<E> last;/** 用来控制同时只有一个线程可以从队头获取元素 */private final ReentrantLock takeLock = new ReentrantLock();/** 条件队列,队列为空时,执行出队take操作的线程将会被置入该条件队列 */private final Condition notEmpty = takeLock.newCondition();/** 用来控制同时只有一个线程可以从队尾插入元素 */private final ReentrantLock putLock = new ReentrantLock();/** 条件队列,队列满时,执行入队操作put的线程将会被置入该条件队列 */private final Condition notFull = putLock.newCondition();
}
  • 单向链表实现,维护head和last两个Node节点,head是哨兵节点,head.next是第一个真正的元素,last指向队尾节点。
  • 队列中的元素通过AtomicInteger类型的原子变量count记录。
  • 维护两把锁:takeLock保证同时只有一个线程可以从对头获取元素,putLock保证只有一个线程可以在队尾插入元素。
  • 维护两个条件变量:notEmpty和notFull,维护条件队列,用以存放入队出队阻塞的线程。

如果希望获取一个元素,需要先获取takeLock锁,且notEmpty条件成立。

如果希望插入一个元素,需要先获取putLock锁,且notFull条件成立。

构造器

使用LinkedBlockingQueue的时候,可以指定容量,也可以使用默认的Integer.MAX_VALUE,几乎就是无界的了,当然,也可以传入集合对象,直接构造。

	// 如果不指定容量,默认容量为Integer.MAX_VALUE  (1 << 30) - 1public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}// 传入指定的容量public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;// 初始化last 和 head指针last = head = new Node<E>(null);}// 传入指定集合对象,容量视为Integer.MAX_VALUE,直接构造queuepublic LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);// 写线程获取putLockfinal ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibilitytry {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}}

出队和入队操作

队列的操作最核心的部分莫过于入队和出队了,后面分析的方法基本上都基于这两个工具方法。

LinkedBlockingQueue的出队和入队相对ArrayBlockingQueue来说就简单很多啦:

入队enqueue

    private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;}
  1. 将node连接到last的后面。
  2. 更新last指针的位置,指向node。

出队dequeue

    private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first; // head向后移一位E x = first.item;first.item = null;return x;}

队列中的元素实际上是从head.first开始的,那么移除队头,其实就是将head指向head.next即可。

阻塞式操作

E take() 阻塞式获取

take操作将会获取当前队列头部元素并移除,如果队列为空则阻塞当前线程直到队列不为空,退出阻塞时返回获取的元素。

如果线程在阻塞时被其他线程设置了中断标志,则抛出InterruptedException异常并返回。

    public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;// 首先要获取takeLockfinal ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 如果队列为空, notEmpty不满足,就等着while (count.get() == 0) {notEmpty.await();}// 出队x = dequeue();// c先赋值为count的值, count 减 1c = count.getAndDecrement();// 这次出队后至少还有一个元素,唤醒notEmpty中的读线程if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// c == capacity 表示在该元素出队之前,队列是满的if (c == capacity)// 因为在这之前队列是满的,可能会有写线程在等着,这里做个唤醒signalNotFull();return x;}// 用于唤醒写线程private void signalNotFull() {final ReentrantLock putLock = this.putLock;// 获取putLockputLock.lock();try {notFull.signal();} finally {putLock.unlock();}}

void put(E e) 阻塞式插入

put操作将向队尾插入元素,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列不满。

如果线程在阻塞时被其他线程设置了中断标志,则抛出InterruptedException异常并返回。

    public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// 所有的插入操作 都约定 本地变量c 作为是否失败的标识int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 插入操作获取 putLockputLock.lockInterruptibly();try {// 队列满,这时notFull条件不满足,awaitwhile (count.get() == capacity) {notFull.await();}enqueue(node);// c先返回count的值 , 原子变量 + 1 ,c = count.getAndIncrement();// 至少还有一个空位可以插入,notFull条件是满足的,唤醒它if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// c == 0 表示在该元素入队之前,队列是空的if (c == 0)// 因为在这之前队列是空的,可能会有读线程在等着,这里做个唤醒signalNotEmpty();}// 用于唤醒读线程private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;// 获取takeLocktakeLock.lock();try {// 唤醒notEmpty.signal();} finally {takeLock.unlock();}}

E poll(timeout, unit) 阻塞式超时获取

在take阻塞式获取方法的基础上额外增加超时功能,传入一个timeout,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回null。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 这里就是超时机制的逻辑所在while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

boolean offer(e, timeout, unit) 阻塞式超时插入

在put阻塞式插入方法的基础上额外增加超时功能,传入一个timeout,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回null。

    public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}

其他常规操作

boolean offer(E e)

offer(E e)是非阻塞的方法,向队尾插入一个元素,如果队列未满,则插入成功并返回true;如果队列已满则丢弃当前元素,并返回false。

    public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;// 此时队列已满,直接返回falseif (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);// 插入操作 获取putLockfinal ReentrantLock putLock = this.putLock;putLock.lock();try {// 加锁后再校验一次if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0; // 只要不是-1,就代表成功~}

E poll()

从队列头部获取并移除第一个元素,如果队列为空则返回null。

    public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {// 如果队列不为空,则出队, 并递减计数if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

E peek()

瞅一瞅队头的元素是啥,如果队列为空,则返回null。

    public E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {// 实际上第一个元素是head.nextNode<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}}

Boolean remove(Object o)

移除队列中与元素o相等【指的是equals方法判定相同】的元素,移除成功返回true,如果队列为空或没有匹配元素,则返回false。

    public boolean remove(Object o) {if (o == null) return false;fullyLock();try {// trail 和 p 同时向后遍历, 如果p匹配了,就让trail.next = p.next代表移除pfor (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}}// trail为p的前驱, 希望移除p节点void unlink(Node<E> p, Node<E> trail) {// assert isFullyLocked();// p.next is not changed, to allow iterators that are// traversing p to maintain their weak-consistency guarantee.p.item = null;trail.next = p.next;// 移除p// 如果p已经是最后一个节点了,就更新一下lastif (last == p)last = trail;// 移除一个节点之后,队列从满到未满, 唤醒notFullif (count.getAndDecrement() == capacity)notFull.signal();}//----- 多个锁 获取和释放的顺序是 相反的// 同时上锁void fullyLock() {putLock.lock();takeLock.lock();}// 同时解锁void fullyUnlock() {takeLock.unlock();putLock.unlock();}

总结

  • LinkedBlockingQueue是由单链表构成的界限可选的阻塞队列,如不指定边界,则为Integer.MAX_VALUE,因此如不指定边界,一般来说,插入的时候都会成功。
  • 维护两把锁:takeLock保证同时只有一个线程可以从对头获取元素,putLock保证只有一个线程可以在队尾插入元素。
  • 维护两个条件变量:notEmpty和notFull,维护条件队列,用以存放入队出队阻塞的线程。

如果希望获取一个元素,需要先获取takeLock锁,且notEmpty条件成立。

如果希望插入一个元素,需要先获取putLock锁,且notFull条件成立。

参考阅读

  • 《Java并发编程之美》
  • 《Java并发编程的艺术》

这篇关于Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/582121

相关文章

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

Python实现高效地读写大型文件

《Python实现高效地读写大型文件》Python如何读写的是大型文件,有没有什么方法来提高效率呢,这篇文章就来和大家聊聊如何在Python中高效地读写大型文件,需要的可以了解下... 目录一、逐行读取大型文件二、分块读取大型文件三、使用 mmap 模块进行内存映射文件操作(适用于大文件)四、使用 pand

python实现pdf转word和excel的示例代码

《python实现pdf转word和excel的示例代码》本文主要介绍了python实现pdf转word和excel的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、引言二、python编程1,PDF转Word2,PDF转Excel三、前端页面效果展示总结一

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.