java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque

本文主要是介绍java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题

  • ConcurrentLinkedDeque 是什么?

  • 优缺点?

  • 应用场景?

  • 源码实现?

  • 个人启发?

引言

在并发编程中我们有时候需要使用线程安全的队列。

如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。

使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现,本文让我们一起来研究下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并发编程的技巧。

ConcurrentLinkedDeque

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。

ConcurrentLinkedDeque是一种基于链接节点的无限并发链表。可以安全地并发执行插入、删除和访问操作。当许多线程同时访问一个公共集合时,ConcurrentLinkedDeque是一个合适的选择。

和大多数其他并发的集合类型一样,这个类不允许使用空元素。

算法

它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改, Michael & Scott算法的详细信息可以参见[http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf]http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf)。

注意的地方

与大多数集合类型不同,其size方法不是一个常量操作。因为链表的异步性质,确定当前元素的数量需要遍历所有的元素,所以如果在遍历期间有其他线程修改了这个集合,size方法就可能会报告不准确的结果。

同时,对链表的批量操作也不是一个原子操作,在使用的时候要注意,在API文档中这样表述:

批量的操作:包括添加、删除或检查多个元素,比如addAll(java.util.Collection<? extends E>)、removeIf(java.util.function.Predicate<? super E>)或者removeIf(java.util.function.Predicate<? super E>)或forEach(java.util.function.Consumer<? super E>)方法,这个类型并不保证以原子方式执行。

由此可见如果想保证原子访问,不得使用批量操作的方法。

线程安全的队列对比

我来分析设计一个线程安全的队列哪几种方法。

synchronized 同步队列

第一种:使用synchronized同步队列,就像Vector或者Collections.synchronizedList/Collection那样。
显然这不是一个好的并发队列,这会导致吞吐量急剧下降。

Lock 同步队列

第二种:使用Lock。一种好的实现方式是使用ReentrantReadWriteLock来代替ReentrantLock提高读取的吞吐量。
但是显然 ReentrantReadWriteLock的实现更为复杂,而且更容易导致出现问题,
另外也不是一种通用的实现方式,因为 ReentrantReadWriteLock适合哪种读取量远远大于写入量的场合。
当然了ReentrantLock是一种很好的实现,结合 Condition能够很方便的实现阻塞功能,
这在后面介绍BlockingQueue的时候会具体分析。

CAS 操作

第三种:使用CAS操作。尽管Lock的实现也用到了CAS操作,但是毕竟是间接操作,而且会导致线程挂起。
一个好的并发队列就是采用某种非阻塞算法来取得最大的吞吐量。

ConcurrentLinkedQueue采用的就是第三种策略。

它采用了参考资料1(http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf) 中的算法。

要使用非阻塞算法来完成队列操作,那么就需要一种“循环尝试”的动作,就是循环操作队列,直到成功为止,失败就会再次尝试。

简单实用例子

场景

向公共列表中插入元素。

示例编码

import java.util.concurrent.ConcurrentLinkedDeque;public class CLDMain {private static ConcurrentLinkedDeque<String> cld = new ConcurrentLinkedDeque<>();public static void main(String[] args) {int numThread = Runtime.getRuntime().availableProcessors();Thread[] threads = new Thread[numThread];for (int i = 0; i < threads.length; i++) {(threads[i] = new Thread(addTask(), "Thread "+i)).start();}}public static Runnable addTask() {return new Runnable() {@Overridepublic void run() {int num = Runtime.getRuntime().availableProcessors();for (int i = 0; i < num; i++) {StringBuilder item = new StringBuilder("Item ").append(i);cld.addFirst(item.toString());callbackAdd(Thread.currentThread().getName(), item);}callbackFinish(Thread.currentThread().getName());}};}public static void callbackAdd(String threadName, StringBuilder item) {StringBuilder builder = new StringBuilder(threadName).append(" added :").append(item);System.out.println(builder);}public static void callbackFinish(String threadName) {StringBuilder builder = new StringBuilder(threadName).append(" has Finished");System.out.println(builder);System.out.println(new StringBuilder("CurrentSize ").append(cld.size()));}
}
  • 输出日志
Thread 0 added :Item 0
Thread 0 added :Item 1
Thread 0 added :Item 2
Thread 0 added :Item 3
Thread 0 has Finished
CurrentSize 6
Thread 1 added :Item 0
Thread 2 added :Item 0
Thread 2 added :Item 1
Thread 2 added :Item 2
Thread 2 added :Item 3
Thread 1 added :Item 1
Thread 1 added :Item 2
Thread 2 has Finished
Thread 1 added :Item 3
Thread 1 has Finished
CurrentSize 13
CurrentSize 13
Thread 3 added :Item 0
Thread 3 added :Item 1
Thread 3 added :Item 2
Thread 3 added :Item 3
Thread 3 has Finished
CurrentSize 16

简单分析

该程序实现了多线程并发添加大量元素到一个公共的链表,刚好是ConcurrentLinkedDeque的典型使用场景。

同时也验证了上面的说法,即size()方法需要遍历链表,可能返回错误的结果。

源码分析

接口

/*** @since 1.7* @author Doug Lea* @author Martin Buchholz* @param <E> the type of elements held in this collection*/
public class ConcurrentLinkedDeque<E>extends AbstractCollection<E>implements Deque<E>, java.io.Serializable {

Node 节点定义

static final class Node<E> {volatile Node<E> prev;volatile E item;volatile Node<E> next;Node() {  // default constructor for NEXT_TERMINATOR, PREV_TERMINATOR}/*** Constructs a new node.  Uses relaxed write because item can* only be seen after publication via casNext or casPrev.*/Node(E item) {UNSAFE.putObject(this, itemOffset, item);}boolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}void lazySetPrev(Node<E> val) {UNSAFE.putOrderedObject(this, prevOffset, val);}boolean casPrev(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long prevOffset;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = Node.class;prevOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("prev"));itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}}
}

volatile 参见 volatile

UnSafe 参见 UnSafe

ps: 这里对 UnSafe 不熟练,看起来比较吃力。

基础属性

    private static final int HOPS = 2;private transient volatile Node<E> tail;private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;@SuppressWarnings("unchecked")Node<E> prevTerminator() {return (Node<E>) PREV_TERMINATOR;}@SuppressWarnings("unchecked")Node<E> nextTerminator() {return (Node<E>) NEXT_TERMINATOR;}

构造器

    /*** Constructs an empty deque.*/public ConcurrentLinkedDeque() {head = tail = new Node<E>(null);}/*** Constructs a deque initially containing the elements of* the given collection, added in traversal order of the* collection's iterator.** @param c the collection of elements to initially contain* @throws NullPointerException if the specified collection or any*         of its elements are null*/public ConcurrentLinkedDeque(Collection<? extends E> c) {// Copy c into a private chain of NodesNode<E> h = null, t = null;for (E e : c) {checkNotNull(e);Node<E> newNode = new Node<E>(e);if (h == null)h = t = newNode;else {t.lazySetNext(newNode);newNode.lazySetPrev(t);t = newNode;}}initHeadTail(h, t);}

用到的方法

  • checkNotNull

校验元素不为 null。

ps: 这种方法竟然没有统一的方法类。。。

/*** Throws NullPointerException if argument is null.** @param v the element*/
private static void checkNotNull(Object v) {if (v == null)throw new NullPointerException();
}
  • initHeadTail

初始化头和尾

/*** Initializes head and tail, ensuring invariants hold.*/
private void initHeadTail(Node<E> h, Node<E> t) {if (h == t) {if (h == null)h = t = new Node<E>(null);else {// Avoid edge case of a single Node with non-null item.Node<E> newNode = new Node<E>(null);t.lazySetNext(newNode);newNode.lazySetPrev(t);t = newNode;}}head = h;tail = t;
}

添加元素

和原来类似,也分为头尾。也有 add/offer。本系列只选取一个来看,后面都是如此。

    /*** Links e as first element.*/private void linkFirst(E e) {checkNotNull(e);final Node<E> newNode = new Node<E>(e);restartFromHead:for (;;)for (Node<E> h = head, p = h, q;;) {if ((q = p.prev) != null &&(q = (p = q).prev) != null)// Check for head updates every other hop.// If p == q, we are sure to follow head instead.p = (h != (h = head)) ? h : q;else if (p.next == p) // PREV_TERMINATORcontinue restartFromHead;else {// p is first nodenewNode.lazySetNext(p); // CAS piggybackif (p.casPrev(null, newNode)) {// Successful CAS is the linearization point// for e to become an element of this deque,// and for newNode to become "live".if (p != h) // hop two nodes at a timecasHead(h, newNode);  // Failure is OK.return;}// Lost CAS race to another thread; re-read prev}}}

restartFromHead: 这里应该是一个 goto 的语法,用在合适的地方就是好语法。

这里用了 CAS 来保证线程安全性。

cas 方法

private boolean casHead(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}private boolean casTail(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

移除元素

public E pollLast() {for (Node<E> p = last(); p != null; p = pred(p)) {E item = p.item;if (item != null && p.casItem(item, null)) {unlink(p);return item;}}return null;
}

私有方法

  • unlink
    /*** Unlinks non-null node x.*/void unlink(Node<E> x) {// assert x != null;// assert x.item == null;// assert x != PREV_TERMINATOR;// assert x != NEXT_TERMINATOR;final Node<E> prev = x.prev;final Node<E> next = x.next;if (prev == null) {unlinkFirst(x, next);} else if (next == null) {unlinkLast(x, prev);} else {// Unlink interior node.//// This is the common case, since a series of polls at the// same end will be "interior" removes, except perhaps for// the first one, since end nodes cannot be unlinked.//// At any time, all active nodes are mutually reachable by// following a sequence of either next or prev pointers.//// Our strategy is to find the unique active predecessor// and successor of x.  Try to fix up their links so that// they point to each other, leaving x unreachable from// active nodes.  If successful, and if x has no live// predecessor/successor, we additionally try to gc-unlink,// leaving active nodes unreachable from x, by rechecking// that the status of predecessor and successor are// unchanged and ensuring that x is not reachable from// tail/head, before setting x's prev/next links to their// logical approximate replacements, self/TERMINATOR.Node<E> activePred, activeSucc;boolean isFirst, isLast;int hops = 1;// Find active predecessorfor (Node<E> p = prev; ; ++hops) {if (p.item != null) {activePred = p;isFirst = false;break;}Node<E> q = p.prev;if (q == null) {if (p.next == p)return;activePred = p;isFirst = true;break;}else if (p == q)return;elsep = q;}// Find active successorfor (Node<E> p = next; ; ++hops) {if (p.item != null) {activeSucc = p;isLast = false;break;}Node<E> q = p.next;if (q == null) {if (p.prev == p)return;activeSucc = p;isLast = true;break;}else if (p == q)return;elsep = q;}// TODO: better HOP heuristicsif (hops < HOPS// always squeeze out interior deleted nodes&& (isFirst | isLast))return;// Squeeze out deleted nodes between activePred and// activeSucc, including x.skipDeletedSuccessors(activePred);skipDeletedPredecessors(activeSucc);// Try to gc-unlink, if possibleif ((isFirst | isLast) &&// Recheck expected state of predecessor and successor(activePred.next == activeSucc) &&(activeSucc.prev == activePred) &&(isFirst ? activePred.prev == null : activePred.item != null) &&(isLast  ? activeSucc.next == null : activeSucc.item != null)) {updateHead(); // Ensure x is not reachable from headupdateTail(); // Ensure x is not reachable from tail// Finally, actually gc-unlinkx.lazySetPrev(isFirst ? prevTerminator() : x);x.lazySetNext(isLast  ? nextTerminator() : x);}}}

TODO:…

后续补充

参考资料

https://segmentfault.com/a/1190000013144544

聊聊并发(六)ConcurrentLinkedQueue的实现原理分析

https://blog.csdn.net/lifuxiangcaohui/article/details/8051144

目录

java多线程并发之旅-01-并发概览

这篇关于java多线程并发之旅-18-双端队列之 ConcurrentLinkedDeque的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259373

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听