JUC第十七讲:JUC集合: ConcurrentLinkedQueue详解

2023-10-08 11:28

本文主要是介绍JUC第十七讲:JUC集合: ConcurrentLinkedQueue详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

JUC第十七讲:JUC集合: ConcurrentLinkedQueue详解

本文是JUC第十七讲:JUC集合 - ConcurrentLinkedQueue详解。ConcurerntLinkedQueue一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部是在队列中时间最长的元素。队列的尾部是在队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用null元素。

文章目录

  • JUC第十七讲:JUC集合: ConcurrentLinkedQueue详解
    • 1、带着BAT大厂的面试问题去理解
    • 2、ConcurrentLinkedQueue数据结构
    • 3、ConcurrentLinkedQueue源码分析
      • 3.1、类的继承关系
      • 3.2、类的内部类
      • 3.3、类的属性
      • 3.4、类的构造函数
      • 3.5、核心函数分析
        • 1、offer函数
        • 2、poll函数
        • 3、remove函数
        • 4、size函数
    • 4、ConcurrentLinkedQueue示例
      • 4.1、ConcurrentLinkedQueue在商品中心的应用
    • 5、再深入理解
      • 5.1、HOPS(延迟更新的策略)的设计
      • 5.2、ConcurrentLinkedQueue适合的场景
    • 6、参考文章

1、带着BAT大厂的面试问题去理解

请带着这些问题继续后文,会很大程度上帮助你更好的理解相关知识点。

  • 要想用线程安全的队列有哪些选择? Vector,Collections.synchronizedList(List<T> list), ConcurrentLinkedQueue等
  • ConcurrentLinkedQueue实现的数据结构?
  • ConcurrentLinkedQueue底层原理? 全程无锁(CAS)
  • ConcurrentLinkedQueue的核心方法有哪些? offer(),poll(),peek(),isEmpty()等队列常用方法
  • 说说ConcurrentLinkedQueue的HOPS(延迟更新的策略)的设计?
  • ConcurrentLinkedQueue适合什么样的使用场景?

2、ConcurrentLinkedQueue数据结构

通过源码分析可知,ConcurrentLinkedQueue的数据结构与LinkedBlockingQueue的数据结构相同,都是使用的链表结构。ConcurrentLinkedQueue的数据结构如下:

  • img

说明:ConcurrentLinkedQueue采用的链表结构,并且包含有一个头节点和一个尾结点。

3、ConcurrentLinkedQueue源码分析

3.1、类的继承关系

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>implements Queue<E>, java.io.Serializable {}

说明:ConcurrentLinkedQueue 继承了抽象类 AbstractQueue,AbstractQueue定义了对队列的基本操作;同时实现了Queue接口,Queue定义了对队列的基本操作,同时,还实现了Serializable接口,表示可以被序列化。

3.2、类的内部类

private static class Node<E> {// 元素volatile E item;// next域volatile Node<E> next;/*** Constructs a new node.  Uses relaxed write because item can* only be seen after publication via casNext.*/// 构造函数Node(E item) {// 设置item的值UNSAFE.putObject(this, itemOffset, item);}// 比较并替换item值boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {// 设置next域的值,并不会保证修改对其他线程立即可见UNSAFE.putOrderedObject(this, nextOffset, val);}// 比较并替换next域的值boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}// Unsafe mechanics// 反射机制private static final sun.misc.Unsafe UNSAFE;// item域的偏移量private static final long itemOffset;// next域的偏移量private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}
}

说明: Node类表示链表结点,用于存放元素,包含item域和next域,item域表示元素,next域表示下一个结点,其利用反射机制和CAS机制来更新item域和next域,保证原子性。

3.3、类的属性

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>implements Queue<E>, java.io.Serializable {// 版本序列号        private static final long serialVersionUID = 196745693267521676L;// 反射机制private static final sun.misc.Unsafe UNSAFE;// head域的偏移量private static final long headOffset;// tail域的偏移量private static final long tailOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = ConcurrentLinkedQueue.class;headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));} catch (Exception e) {throw new Error(e);}}// 头节点private transient volatile Node<E> head;// 尾结点private transient volatile Node<E> tail;
}

说明: 属性中包含了head域和tail域,表示链表的头节点和尾结点,同时,ConcurrentLinkedQueue也使用了反射机制和CAS机制来更新头节点和尾结点,保证原子性。

3.4、类的构造函数

  • ConcurrentLinkedQueue()型构造函数
public ConcurrentLinkedQueue() {// 初始化头节点与尾结点head = tail = new Node<E>(null);
}

说明: 该构造函数用于创建一个最初为空的 ConcurrentLinkedQueue,头节点与尾结点指向同一个结点,该结点的item域为null,next域也为null。

  • ConcurrentLinkedQueue(Collection<? extends E>)型构造函数
public ConcurrentLinkedQueue(Collection<? extends E> c) {Node<E> h = null, t = null;for (E e : c) { // 遍历c集合// 保证元素不为空checkNotNull(e);// 新生一个结点Node<E> newNode = new Node<E>(e);if (h == null) // 头节点为null// 赋值头节点与尾结点h = t = newNode;else {// 直接头节点的next域t.lazySetNext(newNode);// 重新赋值头节点t = newNode;}}if (h == null) // 头节点为null// 新生头节点与尾结点h = t = new Node<E>(null);// 赋值头节点head = h;// 赋值尾结点tail = t;
}

说明: 该构造函数用于创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。

3.5、核心函数分析

1、offer函数
public boolean offer(E e) {// 元素不为nullcheckNotNull(e);// 新生一个结点final Node<E> newNode = new Node<E>(e);for (Node<E> t = tail, p = t;;) { // 无限循环// q为p结点的下一个结点Node<E> q = p.next;if (q == null) { // q结点为null// p is last nodeif (p.casNext(null, newNode)) { // 比较并进行替换p结点的next域// Successful CAS is the linearization point// for e to become an element of this queue,// and for newNode to become "live".if (p != t) // p不等于t结点,不一致    // hop two nodes at a time// 比较并替换尾结点casTail(t, newNode);  // Failure is OK.// 返回return true;}// Lost CAS race to another thread; re-read next}else if (p == q) // p结点等于q结点// We have fallen off list.  If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable.  Else the new tail is a better bet.// 原来的尾结点与现在的尾结点是否相等,若相等,则p赋值为head,否则,赋值为现在的尾结点p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops.// 重新赋值p结点p = (p != t && t != (t = tail)) ? t : q;}
}

说明: offer函数用于将指定元素插入此队列的尾部。下面模拟offer函数的操作,队列状态的变化(假设单线程添加元素,连续添加10、20两个元素)。

img

  • 若ConcurrentLinkedQueue的初始状态如上图所示,即队列为空。单线程添加元素,此时,添加元素10,则状态如下所示
    img

  • 如上图所示,添加元素10后,tail没有变化,还是指向之前的结点,继续添加元素20,则状态如下所示
    img

  • 如上图所示,添加元素20后,tail指向了最新添加的结点。

2、poll函数
public E poll() {restartFromHead:for (;;) { // 无限循环for (Node<E> h = head, p = h, q;;) { // 保存头节点// item项E item = p.item;if (item != null && p.casItem(item, null)) { // item不为null并且比较并替换item成功// Successful CAS is the linearization point// for item to be removed from this queue.if (p != h) // p不等于h    // hop two nodes at a time// 更新头节点updateHead(h, ((q = p.next) != null) ? q : p); // 返回itemreturn item;}else if ((q = p.next) == null) { // q结点为null// 更新头节点updateHead(h, p);return null;}else if (p == q) // p等于q// 继续循环continue restartFromHead;else// p赋值为qp = q;}}
}

说明: 此函数用于获取并移除此队列的头,如果此队列为空,则返回null。下面模拟poll函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,poll两次)。
img

  • 队列初始状态如上图所示,在poll操作后,队列的状态如下图所示
    img

  • 如上图可知,poll操作后,head改变了,并且head所指向的结点的item变为了null。再进行一次poll操作,队列的状态如下图所示。
    img

  • 如上图可知,poll操作后,head结点没有变化,只是指示的结点的item域变成了null。

3、remove函数
public boolean remove(Object o) {// 元素为null,返回if (o == null) return false;Node<E> pred = null;for (Node<E> p = first(); p != null; p = succ(p)) { // 获取第一个存活的结点// 第一个存活结点的item值E item = p.item;if (item != null &&o.equals(item) &&p.casItem(item, null)) { // 找到item相等的结点,并且将该结点的item设置为null// p的后继结点Node<E> next = succ(p);if (pred != null && next != null) // pred不为null并且next不为null// 比较并替换next域pred.casNext(p, next);return true;}// pred赋值为ppred = p;}return false;
}

说明: 此函数用于从队列中移除指定元素的单个实例(如果存在)。其中,会调用到first函数和succ函数,first函数的源码如下

Node<E> first() {restartFromHead:for (;;) { // 无限循环,确保成功for (Node<E> h = head, p = h, q;;) {// p结点的item域是否为nullboolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) { // item不为null或者next域为null// 更新头节点updateHead(h, p);// 返回结点return hasItem ? p : null;}else if (p == q) // p等于q// 继续从头节点开始continue restartFromHead;else// p赋值为qp = q;}}
}

说明: first函数用于找到链表中第一个存活的结点。succ函数源码如下

final Node<E> succ(Node<E> p) {// p结点的next域Node<E> next = p.next;// 如果next域为自身,则返回头节点,否则,返回nextreturn (p == next) ? head : next;
}

说明: succ用于获取结点的下一个结点。如果结点的next域指向自身,则返回head头节点,否则,返回next结点。下面模拟remove函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,执行remove(10)、remove(20)操作)。
img

  • 如上图所示,为ConcurrentLinkedQueue的初始状态,remove(10)后的状态如下图所示
    img

  • 如上图所示,当执行remove(10)后,head指向了head结点之前指向的结点的下一个结点,并且head结点的item域置为null。继续执行remove(20),状态如下图所示
    img

  • 如上图所示,执行remove(20)后,head与tail指向同一个结点,item域为null。

4、size函数
public int size() {// 计数int count = 0;for (Node<E> p = first(); p != null; p = succ(p)) // 从第一个存活的结点开始往后遍历if (p.item != null) // 结点的item域不为null// Collection.size() spec says to max outif (++count == Integer.MAX_VALUE) // 增加计数,若达到最大值,则跳出循环break;// 返回大小return count;
}

说明:此函数用于返回ConcurrenLinkedQueue的大小,从第一个存活的结点(first)开始,往后遍历链表,当结点的item域不为null时,增加计数,之后返回大小。

4、ConcurrentLinkedQueue示例

下面通过一个示例来了解ConcurrentLinkedQueue的使用

import java.util.concurrent.ConcurrentLinkedQueue;class PutThread extends Thread {private ConcurrentLinkedQueue<Integer> clq;public PutThread(ConcurrentLinkedQueue<Integer> clq) {this.clq = clq;}public void run() {for (int i = 0; i < 10; i++) {try {System.out.println("add " + i);clq.add(i);Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}
}class GetThread extends Thread {private ConcurrentLinkedQueue<Integer> clq;public GetThread(ConcurrentLinkedQueue<Integer> clq) {this.clq = clq;}public void run() {for (int i = 0; i < 10; i++) {try {System.out.println("poll " + clq.poll());Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}
}public class ConcurrentLinkedQueueDemo {public static void main(String[] args) {ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();PutThread p1 = new PutThread(clq);GetThread g1 = new GetThread(clq);p1.start();g1.start();}
}

运行结果(某一次):

add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8

说明: GetThread线程不会因为ConcurrentLinkedQueue队列为空而等待,而是直接返回null,所以当实现队列不空时,等待时,则需要用户自己实现等待逻辑。

4.1、ConcurrentLinkedQueue在商品中心的应用

todo

5、再深入理解

5.1、HOPS(延迟更新的策略)的设计

通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

  • tail更新触发时机:当tail指向的节点的下一个节点不为null的时候,会执行定位队列真正的队尾节点的操作,找到队尾节点后完成插入之后才会通过casTail进行tail更新;当tail指向的节点的下一个节点为null的时候,只插入节点不更新tail。
  • head更新触发时机:当head指向的节点的item域为null的时候,会执行定位队列真正的队头节点的操作,找到队头节点后完成删除之后才会通过updateHead进行head更新;当head指向的节点的item域不为null的时候,只删除节点不更新head。

并且在更新操作时,源码中会有注释为:hop two nodes at a time。所以这种延迟更新的策略就被叫做HOPS的大概原因是这个,从上面更新时的状态图可以看出,head和tail的更新是“跳着的”即中间总是间隔了一个。那么这样设计的意图是什么呢?

如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS进行tail的更新,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率,所以doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。对head的更新也是同样的道理,虽然,这样设计会多出在循环中定位队尾节点,但总体来说读的操作效率要远远高于写的性能,因此,多出来的在循环中定位尾节点的操作的性能损耗相对而言是很小的。

5.2、ConcurrentLinkedQueue适合的场景

ConcurrentLinkedQueue通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的逻辑好设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其的使用的改良算法和对哨兵的处理。整体的思路都是比较严谨的,这个也是使用了无锁造成的,我们自己使用无锁的条件的话,这个队列是个不错的参考。

6、参考文章

  • 【JUC】JDK1.8源码分析之ConcurrentLinkedQueue
  • 并发容器之ConcurrentLinkedQueue
  • java并发面试常识之ConcurrentLinkedQueue

这篇关于JUC第十七讲:JUC集合: ConcurrentLinkedQueue详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/164944

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

uva 11178 计算集合模板题

题意: 求三角形行三个角三等分点射线交出的内三角形坐标。 代码: #include <iostream>#include <cstdio>#include <cstdlib>#include <algorithm>#include <cstring>#include <cmath>#include <stack>#include <vector>#include <

6.1.数据结构-c/c++堆详解下篇(堆排序,TopK问题)

上篇:6.1.数据结构-c/c++模拟实现堆上篇(向下,上调整算法,建堆,增删数据)-CSDN博客 本章重点 1.使用堆来完成堆排序 2.使用堆解决TopK问题 目录 一.堆排序 1.1 思路 1.2 代码 1.3 简单测试 二.TopK问题 2.1 思路(求最小): 2.2 C语言代码(手写堆) 2.3 C++代码(使用优先级队列 priority_queue)

K8S(Kubernetes)开源的容器编排平台安装步骤详解

K8S(Kubernetes)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。以下是K8S容器编排平台的安装步骤、使用方式及特点的概述: 安装步骤: 安装Docker:K8S需要基于Docker来运行容器化应用程序。首先要在所有节点上安装Docker引擎。 安装Kubernetes Master:在集群中选择一台主机作为Master节点,安装K8S的控制平面组件,如AP

嵌入式Openharmony系统构建与启动详解

大家好,今天主要给大家分享一下,如何构建Openharmony子系统以及系统的启动过程分解。 第一:OpenHarmony系统构建      首先熟悉一下,构建系统是一种自动化处理工具的集合,通过将源代码文件进行一系列处理,最终生成和用户可以使用的目标文件。这里的目标文件包括静态链接库文件、动态链接库文件、可执行文件、脚本文件、配置文件等。      我们在编写hellowor

LabVIEW FIFO详解

在LabVIEW的FPGA开发中,FIFO(先入先出队列)是常用的数据传输机制。通过配置FIFO的属性,工程师可以在FPGA和主机之间,或不同FPGA VIs之间进行高效的数据传输。根据具体需求,FIFO有多种类型与实现方式,包括目标范围内FIFO(Target-Scoped)、DMA FIFO以及点对点流(Peer-to-Peer)。 FIFO类型 **目标范围FIFO(Target-Sc

019、JOptionPane类的常用静态方法详解

目录 JOptionPane类的常用静态方法详解 1. showInputDialog()方法 1.1基本用法 1.2带有默认值的输入框 1.3带有选项的输入对话框 1.4自定义图标的输入对话框 2. showConfirmDialog()方法 2.1基本用法 2.2自定义按钮和图标 2.3带有自定义组件的确认对话框 3. showMessageDialog()方法 3.1

脏页的标记方式详解

脏页的标记方式 一、引言 在数据库系统中,脏页是指那些被修改过但还未写入磁盘的数据页。为了有效地管理这些脏页并确保数据的一致性,数据库需要对脏页进行标记。了解脏页的标记方式对于理解数据库的内部工作机制和优化性能至关重要。 二、脏页产生的过程 当数据库中的数据被修改时,这些修改首先会在内存中的缓冲池(Buffer Pool)中进行。例如,执行一条 UPDATE 语句修改了某一行数据,对应的缓

OmniGlue论文详解(特征匹配)

OmniGlue论文详解(特征匹配) 摘要1. 引言2. 相关工作2.1. 广义局部特征匹配2.2. 稀疏可学习匹配2.3. 半稠密可学习匹配2.4. 与其他图像表示匹配 3. OmniGlue3.1. 模型概述3.2. OmniGlue 细节3.2.1. 特征提取3.2.2. 利用DINOv2构建图形。3.2.3. 信息传播与新的指导3.2.4. 匹配层和损失函数3.2.5. 与Super