本文主要是介绍一篇文章搞懂 SynchronousQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前言
本文隶属于专栏《100个问题搞定Java并发》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见100个问题搞定Java并发
正文
WHAT
在对 Executors 的介绍中,提到了一个非常特殊的等待队列 SynchronousQueue
详情请见我的这篇博客——Executors 源码解析(JDK8)
SynchronousQueue 的容量为 0 ,任何一个对 SynchronousQueue 的写需要等待一个对 SynchronousQueue 的读,反之亦然。
因此, SynchronousQueue 与其说是一个队列,不如说是一个数据交换通道
。
HOW
那 SynchronousQueue 的奇妙功能是如何实现的呢?
SynchronousQueue 和无锁的操作脱离不了关系,实际上 SynchronousQueue 内部也大量使用了无锁工具。
对 SynchronousQueue 来说,它将 put() 和 take() 两种功能截然不同的方法抽象为一个共同的方法 Transferer.transfer() 。
从字面上看,这就是数据传递的意思。
源码分析(JDK8)
/*** Shared internal API for dual stacks and queues.*/abstract static class Transferer<E> {/*** Performs a put or take.** @param e if non-null, the item to be handed to a consumer;* if null, requests that transfer return an item* offered by producer.* @param timed if this operation should timeout* @param nanos the timeout, in nanoseconds* @return if non-null, the item provided or received; if null,* the operation failed due to timeout or interrupt --* the caller can distinguish which of these occurred* by checking Thread.interrupted.*/abstract E transfer(E e, boolean timed, long nanos);}
当参数 e 为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据。
timed 参数决定是否存在 timeout 时间, nanos 决定了 timeout 的时长。
如果返回值非空,则表示数据已经接受或者正常提供;如果为空,则表示失败(超时或者中断)。
SynchronousQueue 内部会维护一个线程等待队列。
等待队列中会保存等待线程及相关数据的信息。
比如,生产者将数据放入 SynchronousQueue 时,如果没有消费者接收,那么数据本身和线程对象都会打包在队列中等待(因为 SynchronousQueue 容积为 0 ,没有数据可以正常放入)。
3 步
Transferer.transfer() 函数的实现是 SynchronousQueue 的核心,它大体上分为三个步骤。
- 如果等待队列为空,或者队列中节点的类型和本次操作是一致的,那么将当前操作压入队列等待。 比如,等待队列中是读线程等待,本次操作也是读,因此这两个读都需要等待。 进入等待队列的线程可能会被挂起,它们会等待一个“匹配”操作
- 如果等待队列中的元素和本次操作是互补的(比如等待操作是读,而本次操作是写),那么就插入一个“完成”状态的节点,并且让它“匹配”到一个等待节点上。 接着弹出这两个节点,并且使得对应的两个线程继续执行。
- 如果线程发现等待队列的节点就是“完成”节点,那么帮助这个节点完成任务,其流程和步骤2是一致的。
步骤 1 的实现如下:
SNode h = head;if (h == null || h.mode == mode) { // empty or same-modeif (timed && nanos <= 0) { // can't waitif (h != null && h.isCancelled())casHead(h, h.next); // pop cancelled nodeelsereturn null;} else if (casHead(h, s = snode(s, e, h, mode))) {SNode m = awaitFulfill(s, timed, nanos);if (m == s) { // wait was cancelledclean(s);return null;}if ((h = head) != null && h.next == s)casHead(h, s.next); // help s's fulfillerreturn (E) ((mode == REQUEST) ? m.item : s.item);}}
在上述代码中,第 1 行 SNode 表示等待队列中的节点。
内部封装了当前线程、 next 节点、匹配节点、数据内容等信息。
第 2 行,判断当前等待队列为空,或者队列中元素的模式与本次操作相同(比如,都是读操作,那么都必须要等待)。
第 8 行,生成一个新的节点并置于队列头部,这个节点就代表当前线程。
如果入队成功,则执行第 9 行 awaitFulfill() 函数。
该函数会进行自旋等待,并最终挂起当前线程。
直到一个与之对应的操作产生,将其唤醒。
线程被唤醒后(表示已经读取到数据或者自己产生的数据己经被别的线程读取),在第 14 ~ 15 行尝试帮助对应的线程完成两个头部节点的出队操作(这仅仅是友情帮助),并在最后返回读取或者写入的数据(第 16 行)。
步骤 2 的实现如下:
else if (!isFulfilling(h.mode)) { // try to fulfillif (h.isCancelled()) // already cancelledcasHead(h, h.next); // pop and retryelse if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for (;;) { // loop until matched or waiters disappearSNode m = s.next; // m is s's matchif (m == null) { // all waiters are gonecasHead(s, null); // pop fulfill nodes = null; // use new node next timebreak; // restart main loop}SNode mn = m.next;if (m.tryMatch(s)) {casHead(s, mn); // pop both s and mreturn (E) ((mode == REQUEST) ? m.item : s.item);} else // lost matchs.casNext(m, mn); // help unlink}}}
在上述代码中,首先判断头部节点是否处于 fullfill 模式。
如果是,则需要进入步骤3。
否则,将视自己为对应的 fullfill 线程。
第 4 行生成一个 SNode 元素,设置为 fullfill 模式并将其压入队列头部。
接着,设置 m (原始的队列头部)为 s 的匹配节点(第 13 行),这个 tryMatch() 方法将会激活一个等待线程,并将 m 传递给那个线程。
如果设置成功,则表示数据投递完成,将 s 和 m 两个节点弹出即可(第 14 行)。
如果 tryMatch ()方法失败,则表示已经有其他线程帮助完成了操作,那么删除 m 节点即可(第 17 行),因为这个节点的数据已经被投递,不需要再次处理,再次跳转到第 5 行的循环体,进行下一个等待线程的匹配和数据投递,直到队列中没有等待线程为止。
步骤 3 的实现如下:
如果线程在执行时,发现头部元素恰好是 fulfill 模式,它就会帮助 fulfill 节点尽快被执行。
else { // help a fulfillerSNode m = h.next; // m is h's matchif (m == null) // waiter is gonecasHead(h, null); // pop fulfilling nodeelse {SNode mn = m.next;if (m.tryMatch(h)) // help matchcasHead(h, mn); // pop both h and melse // lost matchh.casNext(m, mn); // help unlink}}
上述代码的执行原理和步骤2完全一致。
唯一的不同是步骤3不会返回,因为步骤3所进行的工作是帮助其他线程尽快投递它们的数据,而自己并没有完成对应的操作。
因此,线程进入步骤3后,再次进入大循环体(代码中没有给出),从步骤1开始重新判断条件和投递数据。
从整个数据投递的过程中可以看到,在 SynchronousQueue 中,参与工作的所有线程不仅仅是竟争资源的关系,更重要的是,它们彼此之间还会互相帮助。
在一个线程内部,可能会帮助其他线程完成它们的工作。
这种模式可以在更大程度上减少饥饿的可能,提高系统整体的并行度。
这篇关于一篇文章搞懂 SynchronousQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!