本文主要是介绍死磕Java并发编程(9):无界线程安全队列ConcurrentLinkedQueue源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
这篇文章理解起来不难,相比于 ConcurrentHashMap 比较简单,因为不涉及扩容以及数据迁移等操作,相信你读完一定会有收获的。
本文是死磕Java并发编程系列文章的第 9 篇,主角就是 java 并发包中提供的 ConcurrentLinkedQueue
这是一个线程安全且无界的队列(因为是基于链表实现的,所以无界) ,在并发编程中经常需要用到线程安全的队列,面试线程池时,其中的队列也可以采用这个队列来实现,它线程安全,且采用先进先出的规则排序。
通过整个并发系列文章的学习,我们能想到如果要实现一个线程安全的队列那么有两种方式:一种是使用阻塞方式,即锁的形式,在出队、入队方法加上 synchronized 或者获取 lock 锁 等方式实现。另一种是非阻塞方式,使用自旋CAS的方式来实现。 而 Java 并发包中你会看到 Concurrent 开头的类都是支持并发的,也就是非阻塞的。
这篇文章我们一起来从源码分析下 并发大师 Doug Lea 是如何使用非阻塞的方式实现线程安全队列 ConcurrentLinkedQueue 的,相信从大师身上我们能学到不少并发技巧。
ConcurrentLinkedQueue 结构
通过 ConcurrentLinkedQueue 的类图来分析一下它的结构:
看以看出,ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点即子类 Node 由 节点属性 item 和 next(指向下一个节点Node的引用)组成。 即节点之间通过 next 关联,从而组成一张链表。
入队即每次将元素包装成节点放到链表结尾,出队从链表头删除一个元素返回。
了解了整体结构,你应该也看出来了,并发队列 ConcurrentLinkedQueue 最重要的就是俩操作,入队和出队,接下来我们直接从源码层面学习。
入队操作
入队过程其实就是将入队节点添加到队列尾部,这个入队操作,其实 Doug Lea 大师做了一些优化,为了一会看源码更加清晰,这里先看一组入队的过程图,直观的了解一下优化点。假设现在要插入四个节点:
通过上面入队的操作,观察到 head 和 tail 节点的变化,总结其实就是干了两件事:第一是将入队节点设置到当前队尾节点的 next 节点上;第二就是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置为 tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设为 tail 节点的 next 节点。也就是说 tail 节点并不一定是尾结点,一定要清楚记着这一点,这对理解下面的入队源码非常有用。
下面我就不多说了,直接看代码,理解上面的描述,在结合代码注释,相信你一定能看懂:
// 将指定的元素插入到此队列的末尾,因为队列是无界的,所以这个方法永远不会返回 false 。
public boolean offer(E e) {checkNotNull(e);// 入队前,创建一个入队节点final Node<E> newNode = new Node<E>(e);// 死循环,入队不成功反复入队// 创建一个指向tail节点的引用,p用来表示队列的尾节点,默认情况下等于tail节点for (Node<E> t = tail, p = t;;) {// 获得p节点的下一个节点Node<E> q = p.next;// next节点为空,说明p是尾节点if (q == null) {// p is last node// p是尾结点,则设置p节点的next节点为入队节点if (p.casNext(null, newNode)) {// 首先要知道入队操作不是每次都设置tail节点为尾结点,为了减少CAS操作提高性能,也就是说tail节点不总是尾节点// 如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,// p != t 这个条件者结合下面 else 分支看,下面在冲突的时候会修改 p 指向 p.next,所以导致p不等于 tail,// 即tail节点有大于等于1个的next节点if (p != t) // hop two nodes at a time// 如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,// 这里允许失败,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点casTail(t, newNode); // Failure is OK.return true;}// Lost CAS race to another thread; re-read next}// 这个分支要想进去,即 p 节点等于 p.next 节点,只有一种可能,就是 p 节点和 p.next 节点都为空// 表示这队列刚初始化,正准备添加节点,所以需要返回headelse if (p == q)// 如果tail变了,说明被其他线程添加成功了,则 p 取新的 tail,否则 p 从 head 开始p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops.// 进行这个分支说明next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点// 执行 p != t 说明尾结点不等于tail,t != (t = tail)) 说明tail做了变动,// 同时满足说明tail已经重新设置了,即结尾就是tail,否则尾结点取tail.nextp = (p != t && t != (t = tail)) ? t : q;}
}
从源代码角度来看,整个入队过程主要做两件事情:第一是定位出尾节点;第二是使用 CAS 算法将入队节点设置成尾节点的 next 节点,如不成功则重试。
这里我们思考一下,上面我们分析出入队操作在先进先出队列中就是将其设置为尾结点, Doug Lea 大师的代码写的有点复杂,我们可以不可以用下面的代码来替代呢?
public boolean offer(E e) {if (e == null) throw new NullPointerException();Node<E> n = new Node<E>(e); for (;;) {Node<E> t = tail;if (t.casNext(null, n) && casTail(t, n)) { return true; } }
}
上面的代码每次入队都将 tail 设置为尾结点,这样能节省很多的代码量,并且更加容易理解。
但是这样做的缺点就是每次都要使用 循环 CAS 来进行设置 tail 节点。如果能减少 CAS 更新 tail 节点则能提高入队的效率。 但是我们同样要考虑到,由于 并不是 tail 一定等于尾结点,所以在入队定位末尾节点时就要多一次循环操作。
但是这样效率还是高的,因为 tail 节点它是 volatile 变量,本质上来看是通过增加 volatile 变量的读来减少 volatile 变量的写,而 对于 volatile 写作的开销是远远大于读操作的,所以入队效率会提升。大神对于性能的追求真实到了极致,源码读起来还是有用的吧!!
出队操作
说到出队操作,你肯定会想到,出队是不是也要减少更新 head 节点,而直接弹出 队首元素 从而减少 CAS 更新操作以提升性能呢?
带着这个问题,我们一起往下看,实现为了便于读者理解,这里还是先放一组出队操作的快照图。
从图中得知,并不是每次出队都会 更新 head 节点,如果 head 节点的有元素时,则直接弹出 head 中的元素,清空该节点对元素的引用。如果 head 节点中元素为空,才会更新 head 节点。
有了大概的理解,然后就可以去读源码来分析了:
// 从队头出队
public E poll() {restartFromHead:for (;;) {// p 表示头节点,需要出队的节点for (Node<E> h = head, p = h, q;;) {// 获取p节点的元素E item = p.item;// 如果头节点p中元素不为空,则进行CAS清空p节点对元素的应用,返回p节点的元素if (item != null && p.casItem(item, null)) {// CAS 设为成功后进入到这里,需要判断头节点p是否和head节点不是同一个了,即头节点p已经变更了if (p != h) // hop two nodes at a time// 更新head节点,将p节点的next节点设为head节点,如果p.next不为空则设置p.next,否则设置p本身updateHead(h, ((q = p.next) != null) ? q : p);return item;}// 到这说明头节点p中的元素为null或者发生冲突CAS失败,CAS失败也说明被别的线程取走了当前元素,所以就该取一个节点即next节点// 如果队列头节点p的next节点为空,说明队列已空,将则head设为p,返回nullelse if ((q = p.next) == null) {updateHead(h, p);return null;}// 进到这里说明 (q = p.next) == null 返回false,即p的next不为空// 同时q=p.next,而这个分支是说p=p.next,只有队列初始化时满足条件,两者都为空,则返回head,重头开始赋值else if (p == q)continue restartFromHead;else// 说明p节点的next不为空,且队列不是初始化状态,所以头节点p指向p.nextp = q;}}
}
首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。
出队操作时,也是当 head 节点不等于 头节点 p 时,再次出队,才会将 head 设置为 最新的队头节点,减少了 CAS 操作,提升了效率。
总结
- ConcurrentLinkedQueue 无界是因为结构是用链表组成的,天生无界,当然受到系统资源大小限制;
- ConcurrentLinkedQueue 在入队和出队时,均采用了减少 CAS 更新 head 和 tail 的操作,提升了性能;
- ConcurrentLinkedQueue 采用非阻塞模式实现,即无锁,通过自旋和 CAS 实现线程安全;
今天学习的并发包中线程安全的无界队列 ConcurrentLinkedQueue 源码也不难,相信会让你有更深的了解,方便以后在工作中使用和应付面试。
笔者水平有限,文章难免会有纰漏,如有错误欢迎一起交流探讨,我会第一时间更正的。都看到这里了,码字不易,可爱的你记得 “点赞” 哦,我需要你的正向反馈。
(全文完)fighting!
个人公众号
- 觉得写得还不错的小伙伴麻烦动手点赞+关注;
- 文章如果存在不正确的地方,麻烦指出,非常感谢您的阅读;
- 推荐大家关注我的公众号,会为你定期推送原创干货文章,拉你进优质学习社群;
- github地址:github.com/coderluojust/qige_blogs
这篇关于死磕Java并发编程(9):无界线程安全队列ConcurrentLinkedQueue源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!