java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque

本文主要是介绍java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题

  • ConcurrentLinkedDeque 是什么?

  • 优缺点?

  • 应用场景?

  • 源码实现?

  • 个人启发?

引言

在并发编程中我们有时候需要使用线程安全的队列。

如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。

使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现,本文让我们一起来研究下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并发编程的技巧。

ConcurrentLinkedDeque

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。

ConcurrentLinkedDeque是一种基于链接节点的无限并发链表。可以安全地并发执行插入、删除和访问操作。当许多线程同时访问一个公共集合时,ConcurrentLinkedDeque是一个合适的选择。

和大多数其他并发的集合类型一样,这个类不允许使用空元素。

算法

它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改, Michael & Scott算法的详细信息可以参见[http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf]http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf)。

注意的地方

与大多数集合类型不同,其size方法不是一个常量操作。因为链表的异步性质,确定当前元素的数量需要遍历所有的元素,所以如果在遍历期间有其他线程修改了这个集合,size方法就可能会报告不准确的结果。

同时,对链表的批量操作也不是一个原子操作,在使用的时候要注意,在API文档中这样表述:

批量的操作:包括添加、删除或检查多个元素,比如addAll(java.util.Collection<? extends E>)、removeIf(java.util.function.Predicate<? super E>)或者removeIf(java.util.function.Predicate<? super E>)或forEach(java.util.function.Consumer<? super E>)方法,这个类型并不保证以原子方式执行。

由此可见如果想保证原子访问,不得使用批量操作的方法。

线程安全的队列对比

我来分析设计一个线程安全的队列哪几种方法。

synchronized 同步队列

第一种:使用synchronized同步队列,就像Vector或者Collections.synchronizedList/Collection那样。
显然这不是一个好的并发队列,这会导致吞吐量急剧下降。

Lock 同步队列

第二种:使用Lock。一种好的实现方式是使用ReentrantReadWriteLock来代替ReentrantLock提高读取的吞吐量。
但是显然 ReentrantReadWriteLock的实现更为复杂,而且更容易导致出现问题,
另外也不是一种通用的实现方式,因为 ReentrantReadWriteLock适合哪种读取量远远大于写入量的场合。
当然了ReentrantLock是一种很好的实现,结合 Condition能够很方便的实现阻塞功能,
这在后面介绍BlockingQueue的时候会具体分析。

CAS 操作

第三种:使用CAS操作。尽管Lock的实现也用到了CAS操作,但是毕竟是间接操作,而且会导致线程挂起。
一个好的并发队列就是采用某种非阻塞算法来取得最大的吞吐量。

ConcurrentLinkedQueue采用的就是第三种策略。

它采用了参考资料1(http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf) 中的算法。

要使用非阻塞算法来完成队列操作,那么就需要一种“循环尝试”的动作,就是循环操作队列,直到成功为止,失败就会再次尝试。

简单实用例子

场景

向公共列表中插入元素。

示例编码

import java.util.concurrent.ConcurrentLinkedDeque;public class CLDMain {private static ConcurrentLinkedDeque<String> cld = new ConcurrentLinkedDeque<>();public static void main(String[] args) {int numThread = Runtime.getRuntime().availableProcessors();Thread[] threads = new Thread[numThread];for (int i = 0; i < threads.length; i++) {(threads[i] = new Thread(addTask(), "Thread "+i)).start();}}public static Runnable addTask() {return new Runnable() {@Overridepublic void run() {int num = Runtime.getRuntime().availableProcessors();for (int i = 0; i < num; i++) {StringBuilder item = new StringBuilder("Item ").append(i);cld.addFirst(item.toString());callbackAdd(Thread.currentThread().getName(), item);}callbackFinish(Thread.currentThread().getName());}};}public static void callbackAdd(String threadName, StringBuilder item) {StringBuilder builder = new StringBuilder(threadName).append(" added :").append(item);System.out.println(builder);}public static void callbackFinish(String threadName) {StringBuilder builder = new StringBuilder(threadName).append(" has Finished");System.out.println(builder);System.out.println(new StringBuilder("CurrentSize ").append(cld.size()));}
}
  • 输出日志
Thread 0 added :Item 0
Thread 0 added :Item 1
Thread 0 added :Item 2
Thread 0 added :Item 3
Thread 0 has Finished
CurrentSize 6
Thread 1 added :Item 0
Thread 2 added :Item 0
Thread 2 added :Item 1
Thread 2 added :Item 2
Thread 2 added :Item 3
Thread 1 added :Item 1
Thread 1 added :Item 2
Thread 2 has Finished
Thread 1 added :Item 3
Thread 1 has Finished
CurrentSize 13
CurrentSize 13
Thread 3 added :Item 0
Thread 3 added :Item 1
Thread 3 added :Item 2
Thread 3 added :Item 3
Thread 3 has Finished
CurrentSize 16

简单分析

该程序实现了多线程并发添加大量元素到一个公共的链表,刚好是ConcurrentLinkedDeque的典型使用场景。

同时也验证了上面的说法,即size()方法需要遍历链表,可能返回错误的结果。

源码分析

接口

/*** @since 1.7* @author Doug Lea* @author Martin Buchholz* @param <E> the type of elements held in this collection*/
public class ConcurrentLinkedDeque<E>extends AbstractCollection<E>implements Deque<E>, java.io.Serializable {

Node 节点定义

static final class Node<E> {volatile Node<E> prev;volatile E item;volatile Node<E> next;Node() {  // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR}/*** Constructs a new node.  Uses relaxed write because item can* only be seen after publication via casNext or casPrev.*/Node(E item) {UNSAFE.putObject(this, itemOffset, item);}boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}void lazySetPrev(Node<E> val) {UNSAFE.putOrderedObject(this, prevOffset, val);}boolean casPrev(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long prevOffset;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;prevOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("prev"));itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}
}

volatile 参见 volatile

UnSafe 参见 UnSafe

ps: 这里对 UnSafe 不熟练,看起来比较吃力。

基础属性

    private static final int HOPS = 2;private transient volatile Node<E> tail;private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;@SuppressWarnings("unchecked")Node<E> prevTerminator() {return (Node<E>) PREV_TERMINATOR;}@SuppressWarnings("unchecked")Node<E> nextTerminator() {return (Node<E>) NEXT_TERMINATOR;}

构造器

    /*** Constructs an empty deque.*/public ConcurrentLinkedDeque() {head = tail = new Node<E>(null);}/*** Constructs a deque initially containing the elements of* the given collection, added in traversal order of the* collection's iterator.** @param c the collection of elements to initially contain* @throws NullPointerException if the specified collection or any*         of its elements are null*/public ConcurrentLinkedDeque(Collection<? extends E> c) {// Copy c into a private chain of NodesNode<E> h = null, t = null;for (E e : c) {checkNotNull(e);Node<E> newNode = new Node<E>(e);if (h == null)h = t = newNode;else {t.lazySetNext(newNode);newNode.lazySetPrev(t);t = newNode;}}initHeadTail(h, t);}

用到的方法

  • checkNotNull

校验元素不为 null。

ps: 这种方法竟然没有统一的方法类。。。

/*** Throws NullPointerException if argument is null.** @param v the element*/
private static void checkNotNull(Object v) {if (v == null)throw new NullPointerException();
}
  • initHeadTail

初始化头和尾

/*** Initializes head and tail, ensuring invariants hold.*/
private void initHeadTail(Node<E> h, Node<E> t) {if (h == t) {if (h == null)h = t = new Node<E>(null);else {// Avoid edge case of a single Node with non-null item.Node<E> newNode = new Node<E>(null);t.lazySetNext(newNode);newNode.lazySetPrev(t);t = newNode;}}head = h;tail = t;
}

添加元素

和原来类似,也分为头尾。也有 add/offer。本系列只选取一个来看,后面都是如此。

    /*** Links e as first element.*/private void linkFirst(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);restartFromHead:for (;;)for (Node<E> h = head, p = h, q;;) {if ((q = p.prev) != null &&(q = (p = q).prev) != null)// Check for head updates every other hop.// If p == q, we are sure to follow head instead.p = (h != (h = head)) ? h : q;else if (p.next == p) // PREV_TERMINATORcontinue restartFromHead;else {// p is first nodenewNode.lazySetNext(p); // CAS piggybackif (p.casPrev(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != h) // hop two nodes at a timecasHead(h, newNode);  // Failure is OK.return;}// Lost CAS race to another thread; re-read prev}}}

restartFromHead: 这里应该是一个 goto 的语法,用在合适的地方就是好语法。

这里用了 CAS 来保证线程安全性。

cas 方法

private boolean casHead(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}private boolean casTail(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

移除元素

public E pollLast() {for (Node<E> p = last(); p != null; p = pred(p)) {E item = p.item;if (item != null && p.casItem(item, null)) {unlink(p);return item;}}return null;
}

私有方法

  • unlink
    /*** Unlinks non-null node x.*/void unlink(Node<E> x) {// assert x != null;// assert x.item == null;// assert x != PREV_TERMINATOR;// assert x != NEXT_TERMINATOR;final Node<E> prev = x.prev;final Node<E> next = x.next;if (prev == null) {unlinkFirst(x, next);} else if (next == null) {unlinkLast(x, prev);} else {// Unlink interior node.//// This is the common case, since a series of polls at the// same end will be "interior" removes, except perhaps for// the first one, since end nodes cannot be unlinked.//// At any time, all active nodes are mutually reachable by// following a sequence of either next or prev pointers.//// Our strategy is to find the unique active predecessor// and successor of x.  Try to fix up their links so that// they point to each other, leaving x unreachable from// active nodes.  If successful, and if x has no live// predecessor/successor, we additionally try to gc-unlink,// leaving active nodes unreachable from x, by rechecking// that the status of predecessor and successor are// unchanged and ensuring that x is not reachable from// tail/head, before setting x's prev/next links to their// logical approximate replacements, self/TERMINATOR.Node<E> activePred, activeSucc;boolean isFirst, isLast;int hops = 1;// Find active predecessorfor (Node<E> p = prev; ; ++hops) {if (p.item != null) {activePred = p;isFirst = false;break;}Node<E> q = p.prev;if (q == null) {if (p.next == p)return;activePred = p;isFirst = true;break;}else if (p == q)return;elsep = q;}// Find active successorfor (Node<E> p = next; ; ++hops) {if (p.item != null) {activeSucc = p;isLast = false;break;}Node<E> q = p.next;if (q == null) {if (p.prev == p)return;activeSucc = p;isLast = true;break;}else if (p == q)return;elsep = q;}// TODO: better HOP heuristicsif (hops < HOPS// always squeeze out interior deleted nodes&& (isFirst | isLast))return;// Squeeze out deleted nodes between activePred and// activeSucc, including x.skipDeletedSuccessors(activePred);skipDeletedPredecessors(activeSucc);// Try to gc-unlink, if possibleif ((isFirst | isLast) &&// Recheck expected state of predecessor and successor(activePred.next == activeSucc) &&(activeSucc.prev == activePred) &&(isFirst ? activePred.prev == null : activePred.item != null) &&(isLast  ? activeSucc.next == null : activeSucc.item != null)) {updateHead(); // Ensure x is not reachable from headupdateTail(); // Ensure x is not reachable from tail// Finally, actually gc-unlinkx.lazySetPrev(isFirst ? prevTerminator() : x);x.lazySetNext(isLast  ? nextTerminator() : x);}}}

TODO:…

后续补充

参考资料

https://segmentfault.com/a/1190000013144544

聊聊并发(六)ConcurrentLinkedQueue的实现原理分析

https://blog.csdn.net/lifuxiangcaohui/article/details/8051144

目录

java多线程并发之旅-01-并发概览

这篇关于java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259373

相关文章

idea maven编译报错Java heap space的解决方法

《ideamaven编译报错Javaheapspace的解决方法》这篇文章主要为大家详细介绍了ideamaven编译报错Javaheapspace的相关解决方法,文中的示例代码讲解详细,感兴趣的... 目录1.增加 Maven 编译的堆内存2. 增加 IntelliJ IDEA 的堆内存3. 优化 Mave

Java String字符串的常用使用方法

《JavaString字符串的常用使用方法》String是JDK提供的一个类,是引用类型,并不是基本的数据类型,String用于字符串操作,在之前学习c语言的时候,对于一些字符串,会初始化字符数组表... 目录一、什么是String二、如何定义一个String1. 用双引号定义2. 通过构造函数定义三、St

springboot filter实现请求响应全链路拦截

《springbootfilter实现请求响应全链路拦截》这篇文章主要为大家详细介绍了SpringBoot如何结合Filter同时拦截请求和响应,从而实现​​日志采集自动化,感兴趣的小伙伴可以跟随小... 目录一、为什么你需要这个过滤器?​​​二、核心实现:一个Filter搞定双向数据流​​​​三、完整代码

SpringBoot利用@Validated注解优雅实现参数校验

《SpringBoot利用@Validated注解优雅实现参数校验》在开发Web应用时,用户输入的合法性校验是保障系统稳定性的基础,​SpringBoot的@Validated注解提供了一种更优雅的解... 目录​一、为什么需要参数校验二、Validated 的核心用法​1. 基础校验2. php分组校验3

Java Predicate接口定义详解

《JavaPredicate接口定义详解》Predicate是Java中的一个函数式接口,它代表一个判断逻辑,接收一个输入参数,返回一个布尔值,:本文主要介绍JavaPredicate接口的定义... 目录Java Predicate接口Java lamda表达式 Predicate<T>、BiFuncti

Spring Security基于数据库的ABAC属性权限模型实战开发教程

《SpringSecurity基于数据库的ABAC属性权限模型实战开发教程》:本文主要介绍SpringSecurity基于数据库的ABAC属性权限模型实战开发教程,本文给大家介绍的非常详细,对大... 目录1. 前言2. 权限决策依据RBACABAC综合对比3. 数据库表结构说明4. 实战开始5. MyBA

Spring Security方法级安全控制@PreAuthorize注解的灵活运用小结

《SpringSecurity方法级安全控制@PreAuthorize注解的灵活运用小结》本文将带着大家讲解@PreAuthorize注解的核心原理、SpEL表达式机制,并通过的示例代码演示如... 目录1. 前言2. @PreAuthorize 注解简介3. @PreAuthorize 核心原理解析拦截与

一文详解JavaScript中的fetch方法

《一文详解JavaScript中的fetch方法》fetch函数是一个用于在JavaScript中执行HTTP请求的现代API,它提供了一种更简洁、更强大的方式来处理网络请求,:本文主要介绍Jav... 目录前言什么是 fetch 方法基本语法简单的 GET 请求示例代码解释发送 POST 请求示例代码解释

Java图片压缩三种高效压缩方案详细解析

《Java图片压缩三种高效压缩方案详细解析》图片压缩通常涉及减少图片的尺寸缩放、调整图片的质量(针对JPEG、PNG等)、使用特定的算法来减少图片的数据量等,:本文主要介绍Java图片压缩三种高效... 目录一、基于OpenCV的智能尺寸压缩技术亮点:适用场景:二、JPEG质量参数压缩关键技术:压缩效果对比

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++