java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque

本文主要是介绍java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题

  • ConcurrentLinkedDeque 是什么?

  • 优缺点?

  • 应用场景?

  • 源码实现?

  • 个人启发?

引言

在并发编程中我们有时候需要使用线程安全的队列。

如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。

使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现,本文让我们一起来研究下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并发编程的技巧。

ConcurrentLinkedDeque

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。

ConcurrentLinkedDeque是一种基于链接节点的无限并发链表。可以安全地并发执行插入、删除和访问操作。当许多线程同时访问一个公共集合时,ConcurrentLinkedDeque是一个合适的选择。

和大多数其他并发的集合类型一样,这个类不允许使用空元素。

算法

它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改, Michael & Scott算法的详细信息可以参见[http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf]http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf)。

注意的地方

与大多数集合类型不同,其size方法不是一个常量操作。因为链表的异步性质,确定当前元素的数量需要遍历所有的元素,所以如果在遍历期间有其他线程修改了这个集合,size方法就可能会报告不准确的结果。

同时,对链表的批量操作也不是一个原子操作,在使用的时候要注意,在API文档中这样表述:

批量的操作:包括添加、删除或检查多个元素,比如addAll(java.util.Collection<? extends E>)、removeIf(java.util.function.Predicate<? super E>)或者removeIf(java.util.function.Predicate<? super E>)或forEach(java.util.function.Consumer<? super E>)方法,这个类型并不保证以原子方式执行。

由此可见如果想保证原子访问,不得使用批量操作的方法。

线程安全的队列对比

我来分析设计一个线程安全的队列哪几种方法。

synchronized 同步队列

第一种:使用synchronized同步队列,就像Vector或者Collections.synchronizedList/Collection那样。
显然这不是一个好的并发队列,这会导致吞吐量急剧下降。

Lock 同步队列

第二种:使用Lock。一种好的实现方式是使用ReentrantReadWriteLock来代替ReentrantLock提高读取的吞吐量。
但是显然 ReentrantReadWriteLock的实现更为复杂,而且更容易导致出现问题,
另外也不是一种通用的实现方式,因为 ReentrantReadWriteLock适合哪种读取量远远大于写入量的场合。
当然了ReentrantLock是一种很好的实现,结合 Condition能够很方便的实现阻塞功能,
这在后面介绍BlockingQueue的时候会具体分析。

CAS 操作

第三种:使用CAS操作。尽管Lock的实现也用到了CAS操作,但是毕竟是间接操作,而且会导致线程挂起。
一个好的并发队列就是采用某种非阻塞算法来取得最大的吞吐量。

ConcurrentLinkedQueue采用的就是第三种策略。

它采用了参考资料1(http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf) 中的算法。

要使用非阻塞算法来完成队列操作,那么就需要一种“循环尝试”的动作,就是循环操作队列,直到成功为止,失败就会再次尝试。

简单实用例子

场景

向公共列表中插入元素。

示例编码

import java.util.concurrent.ConcurrentLinkedDeque;public class CLDMain {private static ConcurrentLinkedDeque<String> cld = new ConcurrentLinkedDeque<>();public static void main(String[] args) {int numThread = Runtime.getRuntime().availableProcessors();Thread[] threads = new Thread[numThread];for (int i = 0; i < threads.length; i++) {(threads[i] = new Thread(addTask(), "Thread "+i)).start();}}public static Runnable addTask() {return new Runnable() {@Overridepublic void run() {int num = Runtime.getRuntime().availableProcessors();for (int i = 0; i < num; i++) {StringBuilder item = new StringBuilder("Item ").append(i);cld.addFirst(item.toString());callbackAdd(Thread.currentThread().getName(), item);}callbackFinish(Thread.currentThread().getName());}};}public static void callbackAdd(String threadName, StringBuilder item) {StringBuilder builder = new StringBuilder(threadName).append(" added :").append(item);System.out.println(builder);}public static void callbackFinish(String threadName) {StringBuilder builder = new StringBuilder(threadName).append(" has Finished");System.out.println(builder);System.out.println(new StringBuilder("CurrentSize ").append(cld.size()));}
}
  • 输出日志
Thread 0 added :Item 0
Thread 0 added :Item 1
Thread 0 added :Item 2
Thread 0 added :Item 3
Thread 0 has Finished
CurrentSize 6
Thread 1 added :Item 0
Thread 2 added :Item 0
Thread 2 added :Item 1
Thread 2 added :Item 2
Thread 2 added :Item 3
Thread 1 added :Item 1
Thread 1 added :Item 2
Thread 2 has Finished
Thread 1 added :Item 3
Thread 1 has Finished
CurrentSize 13
CurrentSize 13
Thread 3 added :Item 0
Thread 3 added :Item 1
Thread 3 added :Item 2
Thread 3 added :Item 3
Thread 3 has Finished
CurrentSize 16

简单分析

该程序实现了多线程并发添加大量元素到一个公共的链表,刚好是ConcurrentLinkedDeque的典型使用场景。

同时也验证了上面的说法,即size()方法需要遍历链表,可能返回错误的结果。

源码分析

接口

/*** @since 1.7* @author Doug Lea* @author Martin Buchholz* @param <E> the type of elements held in this collection*/
public class ConcurrentLinkedDeque<E>extends AbstractCollection<E>implements Deque<E>, java.io.Serializable {

Node 节点定义

static final class Node<E> {volatile Node<E> prev;volatile E item;volatile Node<E> next;Node() {  // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR}/*** Constructs a new node.  Uses relaxed write because item can* only be seen after publication via casNext or casPrev.*/Node(E item) {UNSAFE.putObject(this, itemOffset, item);}boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}void lazySetPrev(Node<E> val) {UNSAFE.putOrderedObject(this, prevOffset, val);}boolean casPrev(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long prevOffset;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;prevOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("prev"));itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}
}

volatile 参见 volatile

UnSafe 参见 UnSafe

ps: 这里对 UnSafe 不熟练,看起来比较吃力。

基础属性

    private static final int HOPS = 2;private transient volatile Node<E> tail;private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;@SuppressWarnings("unchecked")Node<E> prevTerminator() {return (Node<E>) PREV_TERMINATOR;}@SuppressWarnings("unchecked")Node<E> nextTerminator() {return (Node<E>) NEXT_TERMINATOR;}

构造器

    /*** Constructs an empty deque.*/public ConcurrentLinkedDeque() {head = tail = new Node<E>(null);}/*** Constructs a deque initially containing the elements of* the given collection, added in traversal order of the* collection's iterator.** @param c the collection of elements to initially contain* @throws NullPointerException if the specified collection or any*         of its elements are null*/public ConcurrentLinkedDeque(Collection<? extends E> c) {// Copy c into a private chain of NodesNode<E> h = null, t = null;for (E e : c) {checkNotNull(e);Node<E> newNode = new Node<E>(e);if (h == null)h = t = newNode;else {t.lazySetNext(newNode);newNode.lazySetPrev(t);t = newNode;}}initHeadTail(h, t);}

用到的方法

  • checkNotNull

校验元素不为 null。

ps: 这种方法竟然没有统一的方法类。。。

/*** Throws NullPointerException if argument is null.** @param v the element*/
private static void checkNotNull(Object v) {if (v == null)throw new NullPointerException();
}
  • initHeadTail

初始化头和尾

/*** Initializes head and tail, ensuring invariants hold.*/
private void initHeadTail(Node<E> h, Node<E> t) {if (h == t) {if (h == null)h = t = new Node<E>(null);else {// Avoid edge case of a single Node with non-null item.Node<E> newNode = new Node<E>(null);t.lazySetNext(newNode);newNode.lazySetPrev(t);t = newNode;}}head = h;tail = t;
}

添加元素

和原来类似,也分为头尾。也有 add/offer。本系列只选取一个来看,后面都是如此。

    /*** Links e as first element.*/private void linkFirst(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);restartFromHead:for (;;)for (Node<E> h = head, p = h, q;;) {if ((q = p.prev) != null &&(q = (p = q).prev) != null)// Check for head updates every other hop.// If p == q, we are sure to follow head instead.p = (h != (h = head)) ? h : q;else if (p.next == p) // PREV_TERMINATORcontinue restartFromHead;else {// p is first nodenewNode.lazySetNext(p); // CAS piggybackif (p.casPrev(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != h) // hop two nodes at a timecasHead(h, newNode);  // Failure is OK.return;}// Lost CAS race to another thread; re-read prev}}}

restartFromHead: 这里应该是一个 goto 的语法,用在合适的地方就是好语法。

这里用了 CAS 来保证线程安全性。

cas 方法

private boolean casHead(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}private boolean casTail(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

移除元素

public E pollLast() {for (Node<E> p = last(); p != null; p = pred(p)) {E item = p.item;if (item != null && p.casItem(item, null)) {unlink(p);return item;}}return null;
}

私有方法

  • unlink
    /*** Unlinks non-null node x.*/void unlink(Node<E> x) {// assert x != null;// assert x.item == null;// assert x != PREV_TERMINATOR;// assert x != NEXT_TERMINATOR;final Node<E> prev = x.prev;final Node<E> next = x.next;if (prev == null) {unlinkFirst(x, next);} else if (next == null) {unlinkLast(x, prev);} else {// Unlink interior node.//// This is the common case, since a series of polls at the// same end will be "interior" removes, except perhaps for// the first one, since end nodes cannot be unlinked.//// At any time, all active nodes are mutually reachable by// following a sequence of either next or prev pointers.//// Our strategy is to find the unique active predecessor// and successor of x.  Try to fix up their links so that// they point to each other, leaving x unreachable from// active nodes.  If successful, and if x has no live// predecessor/successor, we additionally try to gc-unlink,// leaving active nodes unreachable from x, by rechecking// that the status of predecessor and successor are// unchanged and ensuring that x is not reachable from// tail/head, before setting x's prev/next links to their// logical approximate replacements, self/TERMINATOR.Node<E> activePred, activeSucc;boolean isFirst, isLast;int hops = 1;// Find active predecessorfor (Node<E> p = prev; ; ++hops) {if (p.item != null) {activePred = p;isFirst = false;break;}Node<E> q = p.prev;if (q == null) {if (p.next == p)return;activePred = p;isFirst = true;break;}else if (p == q)return;elsep = q;}// Find active successorfor (Node<E> p = next; ; ++hops) {if (p.item != null) {activeSucc = p;isLast = false;break;}Node<E> q = p.next;if (q == null) {if (p.prev == p)return;activeSucc = p;isLast = true;break;}else if (p == q)return;elsep = q;}// TODO: better HOP heuristicsif (hops < HOPS// always squeeze out interior deleted nodes&& (isFirst | isLast))return;// Squeeze out deleted nodes between activePred and// activeSucc, including x.skipDeletedSuccessors(activePred);skipDeletedPredecessors(activeSucc);// Try to gc-unlink, if possibleif ((isFirst | isLast) &&// Recheck expected state of predecessor and successor(activePred.next == activeSucc) &&(activeSucc.prev == activePred) &&(isFirst ? activePred.prev == null : activePred.item != null) &&(isLast  ? activeSucc.next == null : activeSucc.item != null)) {updateHead(); // Ensure x is not reachable from headupdateTail(); // Ensure x is not reachable from tail// Finally, actually gc-unlinkx.lazySetPrev(isFirst ? prevTerminator() : x);x.lazySetNext(isLast  ? nextTerminator() : x);}}}

TODO:…

后续补充

参考资料

https://segmentfault.com/a/1190000013144544

聊聊并发(六)ConcurrentLinkedQueue的实现原理分析

https://blog.csdn.net/lifuxiangcaohui/article/details/8051144

目录

java多线程并发之旅-01-并发概览

这篇关于java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259373

相关文章

在Ubuntu上部署SpringBoot应用的操作步骤

《在Ubuntu上部署SpringBoot应用的操作步骤》随着云计算和容器化技术的普及,Linux服务器已成为部署Web应用程序的主流平台之一,Java作为一种跨平台的编程语言,具有广泛的应用场景,本... 目录一、部署准备二、安装 Java 环境1. 安装 JDK2. 验证 Java 安装三、安装 mys

Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单

《Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单》:本文主要介绍Springboot的ThreadPoolTaskScheduler线... 目录ThreadPoolTaskScheduler线程池实现15分钟不操作自动取消订单概要1,创建订单后

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Java调用Python代码的几种方法小结

《Java调用Python代码的几种方法小结》Python语言有丰富的系统管理、数据处理、统计类软件包,因此从java应用中调用Python代码的需求很常见、实用,本文介绍几种方法从java调用Pyt... 目录引言Java core使用ProcessBuilder使用Java脚本引擎总结引言python

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

Java中的密码加密方式

《Java中的密码加密方式》文章介绍了Java中使用MD5算法对密码进行加密的方法,以及如何通过加盐和多重加密来提高密码的安全性,MD5是一种不可逆的哈希算法,适合用于存储密码,因为其输出的摘要长度固... 目录Java的密码加密方式密码加密一般的应用方式是总结Java的密码加密方式密码加密【这里采用的

Java中ArrayList的8种浅拷贝方式示例代码

《Java中ArrayList的8种浅拷贝方式示例代码》:本文主要介绍Java中ArrayList的8种浅拷贝方式的相关资料,讲解了Java中ArrayList的浅拷贝概念,并详细分享了八种实现浅... 目录引言什么是浅拷贝?ArrayList 浅拷贝的重要性方法一:使用构造函数方法二:使用 addAll(

解决mybatis-plus-boot-starter与mybatis-spring-boot-starter的错误问题

《解决mybatis-plus-boot-starter与mybatis-spring-boot-starter的错误问题》本文主要讲述了在使用MyBatis和MyBatis-Plus时遇到的绑定异常... 目录myBATis-plus-boot-starpythonter与mybatis-spring-b