一篇文章搞懂 SynchronousQueue

2023-12-08 08:58

本文主要是介绍一篇文章搞懂 SynchronousQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见100个问题搞定Java并发

正文

WHAT

在对 Executors 的介绍中,提到了一个非常特殊的等待队列 SynchronousQueue

详情请见我的这篇博客——Executors 源码解析(JDK8)

SynchronousQueue 的容量为 0 ,任何一个对 SynchronousQueue 的写需要等待一个对 SynchronousQueue 的读,反之亦然。

因此, SynchronousQueue 与其说是一个队列,不如说是一个数据交换通道

HOW

那 SynchronousQueue 的奇妙功能是如何实现的呢?

SynchronousQueue 和无锁的操作脱离不了关系,实际上 SynchronousQueue 内部也大量使用了无锁工具。

对 SynchronousQueue 来说,它将 put() 和 take() 两种功能截然不同的方法抽象为一个共同的方法 Transferer.transfer() 。

从字面上看,这就是数据传递的意思。

源码分析(JDK8)

/*** Shared internal API for dual stacks and queues.*/abstract static class Transferer<E> {/*** Performs a put or take.** @param e if non-null, the item to be handed to a consumer;*          if null, requests that transfer return an item*          offered by producer.* @param timed if this operation should timeout* @param nanos the timeout, in nanoseconds* @return if non-null, the item provided or received; if null,*         the operation failed due to timeout or interrupt --*         the caller can distinguish which of these occurred*         by checking Thread.interrupted.*/abstract E transfer(E e, boolean timed, long nanos);}

当参数 e 为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据。

timed 参数决定是否存在 timeout 时间, nanos 决定了 timeout 的时长。

如果返回值非空,则表示数据已经接受或者正常提供;如果为空,则表示失败(超时或者中断)。

SynchronousQueue 内部会维护一个线程等待队列。

等待队列中会保存等待线程及相关数据的信息。

比如,生产者将数据放入 SynchronousQueue 时,如果没有消费者接收,那么数据本身和线程对象都会打包在队列中等待(因为 SynchronousQueue 容积为 0 ,没有数据可以正常放入)。

3 步

Transferer.transfer() 函数的实现是 SynchronousQueue 的核心,它大体上分为三个步骤。

  1. 如果等待队列为空,或者队列中节点的类型和本次操作是一致的,那么将当前操作压入队列等待。 比如,等待队列中是读线程等待,本次操作也是读,因此这两个读都需要等待。 进入等待队列的线程可能会被挂起,它们会等待一个“匹配”操作
  2. 如果等待队列中的元素和本次操作是互补的(比如等待操作是读,而本次操作是写),那么就插入一个“完成”状态的节点,并且让它“匹配”到一个等待节点上。 接着弹出这两个节点,并且使得对应的两个线程继续执行。
  3. 如果线程发现等待队列的节点就是“完成”节点,那么帮助这个节点完成任务,其流程和步骤2是一致的。

步骤 1 的实现如下:

				SNode h = head;if (h == null || h.mode == mode) {  // empty or same-modeif (timed && nanos <= 0) {      // can't waitif (h != null && h.isCancelled())casHead(h, h.next);     // pop cancelled nodeelsereturn null;} else if (casHead(h, s = snode(s, e, h, mode))) {SNode m = awaitFulfill(s, timed, nanos);if (m == s) {               // wait was cancelledclean(s);return null;}if ((h = head) != null && h.next == s)casHead(h, s.next);     // help s's fulfillerreturn (E) ((mode == REQUEST) ? m.item : s.item);}}

在上述代码中,第 1 行 SNode 表示等待队列中的节点。

内部封装了当前线程、 next 节点、匹配节点、数据内容等信息。

第 2 行,判断当前等待队列为空,或者队列中元素的模式与本次操作相同(比如,都是读操作,那么都必须要等待)。

第 8 行,生成一个新的节点并置于队列头部,这个节点就代表当前线程。

如果入队成功,则执行第 9 行 awaitFulfill() 函数。

该函数会进行自旋等待,并最终挂起当前线程。

直到一个与之对应的操作产生,将其唤醒。

线程被唤醒后(表示已经读取到数据或者自己产生的数据己经被别的线程读取),在第 14 ~ 15 行尝试帮助对应的线程完成两个头部节点的出队操作(这仅仅是友情帮助),并在最后返回读取或者写入的数据(第 16 行)。

步骤 2 的实现如下:

				else if (!isFulfilling(h.mode)) { // try to fulfillif (h.isCancelled())            // already cancelledcasHead(h, h.next);         // pop and retryelse if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) { // loop until matched or waiters disappearSNode m = s.next;       // m is s's matchif (m == null) {        // all waiters are gonecasHead(s, null);   // pop fulfill nodes = null;           // use new node next timebreak;              // restart main loop}SNode mn = m.next;if (m.tryMatch(s)) {casHead(s, mn);     // pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);} else                  // lost matchs.casNext(m, mn);   // help unlink}}}

在上述代码中,首先判断头部节点是否处于 fullfill 模式。

如果是,则需要进入步骤3。

否则,将视自己为对应的 fullfill 线程。

第 4 行生成一个 SNode 元素,设置为 fullfill 模式并将其压入队列头部。

接着,设置 m (原始的队列头部)为 s 的匹配节点(第 13 行),这个 tryMatch() 方法将会激活一个等待线程,并将 m 传递给那个线程。

如果设置成功,则表示数据投递完成,将 s 和 m 两个节点弹出即可(第 14 行)。

如果 tryMatch ()方法失败,则表示已经有其他线程帮助完成了操作,那么删除 m 节点即可(第 17 行),因为这个节点的数据已经被投递,不需要再次处理,再次跳转到第 5 行的循环体,进行下一个等待线程的匹配和数据投递,直到队列中没有等待线程为止。

步骤 3 的实现如下:

如果线程在执行时,发现头部元素恰好是 fulfill 模式,它就会帮助 fulfill 节点尽快被执行。

				else {                            // help a fulfillerSNode m = h.next;               // m is h's matchif (m == null)                  // waiter is gonecasHead(h, null);           // pop fulfilling nodeelse {SNode mn = m.next;if (m.tryMatch(h))          // help matchcasHead(h, mn);         // pop both h and melse                        // lost matchh.casNext(m, mn);       // help unlink}}

上述代码的执行原理和步骤2完全一致。

唯一的不同是步骤3不会返回,因为步骤3所进行的工作是帮助其他线程尽快投递它们的数据,而自己并没有完成对应的操作。

因此,线程进入步骤3后,再次进入大循环体(代码中没有给出),从步骤1开始重新判断条件和投递数据。

从整个数据投递的过程中可以看到,在 SynchronousQueue 中,参与工作的所有线程不仅仅是竟争资源的关系,更重要的是,它们彼此之间还会互相帮助。

在一个线程内部,可能会帮助其他线程完成它们的工作。

这种模式可以在更大程度上减少饥饿的可能,提高系统整体的并行度。

这篇关于一篇文章搞懂 SynchronousQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/469270

相关文章

一文带你搞懂Nginx中的配置文件

《一文带你搞懂Nginx中的配置文件》Nginx(发音为“engine-x”)是一款高性能的Web服务器、反向代理服务器和负载均衡器,广泛应用于全球各类网站和应用中,下面就跟随小编一起来了解下如何... 目录摘要一、Nginx 配置文件结构概述二、全局配置(Global Configuration)1. w

【数据结构】——原来排序算法搞懂这些就行,轻松拿捏

前言:快速排序的实现最重要的是找基准值,下面让我们来了解如何实现找基准值 基准值的注释:在快排的过程中,每一次我们要取一个元素作为枢纽值,以这个数字来将序列划分为两部分。 在此我们采用三数取中法,也就是取左端、中间、右端三个数,然后进行排序,将中间数作为枢纽值。 快速排序实现主框架: //快速排序 void QuickSort(int* arr, int left, int rig

java计算机毕设课设—停车管理信息系统(附源码、文章、相关截图、部署视频)

这是什么系统? 资源获取方式在最下方 java计算机毕设课设—停车管理信息系统(附源码、文章、相关截图、部署视频) 停车管理信息系统是为了提升停车场的运营效率和管理水平而设计的综合性平台。系统涵盖用户信息管理、车位管理、收费管理、违规车辆处理等多个功能模块,旨在实现对停车场资源的高效配置和实时监控。此外,系统还提供了资讯管理和统计查询功能,帮助管理者及时发布信息并进行数据分析,为停车场的科学

CSP-J基础之数学基础 初等数论 一篇搞懂(一)

文章目录 前言声明初等数论是什么初等数论历史1. **古代时期**2. **中世纪时期**3. **文艺复兴与近代**4. **现代时期** 整数的整除性约数什么样的整数除什么样的整数才能得到整数?条件:举例说明:一般化: 判断两个数能否被整除 因数与倍数质数与复合数使用开根号法判定质数哥德巴赫猜想最大公因数与辗转相除法计算最大公因数的常用方法:举几个例子:例子 1: 计算 12 和 18

文章解读与仿真程序复现思路——电力自动化设备EI\CSCD\北大核心《考虑燃料电池和电解槽虚拟惯量支撑的电力系统优化调度方法》

本专栏栏目提供文章与程序复现思路,具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源程序擅长文章解读,论文与完整源程序,等方面的知识,电网论文源程序关注python

CSP-J基础之数学基础 初等数论 一篇搞懂(二)

文章目录 前言算术基本定理简介什么是质数?举个简单例子:重要的结论:算术基本定理公式解释:举例: 算术基本定理的求法如何找出质因数:举个简单的例子: 重要的步骤:C++实现 同余举个例子:同余的性质简介1. 同余的自反性2. 同余的对称性3. 同余的传递性4. 同余的加法性质5. 同余的乘法性质 推论 总结 前言 在计算机科学和数学中,初等数论是一个重要的基础领域,涉及到整数

【Linux】萌新看过来!一篇文章带你走进Linux世界

🚀个人主页:奋斗的小羊 🚀所属专栏:Linux 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~ 目录 前言💥1、初识Linux💥1.1 什么是操作系统?💥1.2 各种操作系统对比💥1.3 现代Linux应用💥1.4 Linux常用版本 💥2、Linux 和 Windows 目录结构对比💥2.1 文件系统组织方式💥2.2

多线程的系列文章

Java多线程学习(一)Java多线程入门 Java多线程学习(二)synchronized关键字(1)   Java多线程学习(二)synchronized关键字(2) Java多线程学习(三)volatile关键字 Java多线程学习(四)等待/通知(wait/notify)机制 Java多线程学习(五)线程间通信知识点补充 Java多线程学习(六)Lock锁的使用 Java多

缓存的常见问题 以及解决博客文章

1.jedispool 连 redis 高并发卡死  (子非鱼yy) https://blog.csdn.net/ztx114/article/details/78291734 2. Redis安装及主从配置 https://blog.csdn.net/ztx114/article/details/78320193 3.Spring中使用RedisTemplate操作Redis(sprin

java计算机毕设课设—企业员工信息管理系统(附源码、文章、相关截图、部署视频)

这是什么系统? 获取资料方式在最下方 java计算机毕设课设—企业员工信息管理系统(附源码、文章、相关截图、部署视频) 企业员工信息管理系统旨在为公司提供高效的员工信息管理解决方案。该系统的核心功能涵盖密码修改、员工管理、部门管理、出勤管理、工资管理、请假审核等方面,帮助企业优化人力资源管理流程。系统结构如下: (1)前端(员工端): 1.密码修改:员工可以修改自己的密码,提升账户的安全