java多线程并发之旅-17--双端队列之 LinkedBlockingDeque

2023-10-22 05:32

本文主要是介绍java多线程并发之旅-17--双端队列之 LinkedBlockingDeque,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

问题

  • LinkedBlockingDeque 是什么?

  • 优缺点?

  • 应用场景?

  • 源码实现?

  • 个人启发?

LinkedBlockingDeque

双向并发阻塞队列。

所谓双向是指可以从队列的头和尾同时操作,并发只是线程安全的实现,阻塞允许在入队出队不满足条件时挂起线程,这里说的队列是指支持FIFO/FILO实现的链表。

  1. 要想支持阻塞功能,队列的容量一定是固定的,否则无法在入队的时候挂起线程。也就是capacity是final类型的。

  2. 既然是双向链表,每一个结点就需要前后两个引用,这样才能将所有元素串联起来,支持双向遍历。也即需要prev/next两个引用。

  3. 双向链表需要头尾同时操作,所以需要first/last两个节点,当然可以参考LinkedList那样采用一个节点的双向来完成,那样实现起来就稍微麻烦点。

  4. 既然要支持阻塞功能,就需要锁和条件变量来挂起线程。这里使用一个锁两个条件变量来完成此功能。

优缺点

优点当然是功能足够强大,同时由于采用一个独占锁,因此实现起来也比较简单。所有对队列的操作都加锁就可以完成。同时独占锁也能够很好的支持双向阻塞的特性。

凡事有利必有弊。缺点就是由于独占锁,所以不能同时进行两个操作,这样性能上就大打折扣。从性能的角度讲LinkedBlockingDeque要比LinkedQueue要低很多,比CocurrentLinkedQueue就低更多了,这在高并发情况下就比较明显了。

前面分析足够多的Queue实现后,LinkedBlockingDeque的原理和实现就不值得一提了,无非是在独占锁下对一个链表的普通操作。

序列化

有趣的是此类支持序列化,但是Node并不支持序列化,因此fist/last就不能序列化,那么如何完成序列化/反序列化过程呢?

private void writeObject(java.io.ObjectOutputStream s)throws java.io.IOException {lock.lock();try {// Write out capacity and any hidden stuffs.defaultWriteObject();// Write out all elements in the proper order.for (Node<E> p = first; p != null; p = p.next)s.writeObject(p.item);// Use trailing null as sentinels.writeObject(null);} finally {lock.unlock();}
}private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();count = 0;first = null;last = null;// Read in all elements and place in queuefor (;;) {E item = (E)s.readObject();if (item == null)break;add(item);}
}

描述的是LinkedBlockingDeque序列化/反序列化的过程。序列化时将真正的元素写入输出流,最后还写入了一个null。读取的时候将所有对象列表读出来,如果读取到一个null就表示结束。这就是为什么写入的时候写入一个null的原因,因为没有将count写入流,所以就靠null来表示结束,省一个整数空间。

源码

接口

/*** @since 1.6* @author  Doug Lea* @param <E> the type of elements held in this collection*/
public class LinkedBlockingDeque<E>extends AbstractQueue<E>implements BlockingDeque<E>, java.io.Serializable {

双向链表节点

/** Doubly-linked list node class */
static final class Node<E> {/*** The item, or null if this node has been removed.*/E item;/*** One of:* - the real predecessor Node* - this Node, meaning the predecessor is tail* - null, meaning there is no predecessor*/Node<E> prev;/*** One of:* - the real successor Node* - this Node, meaning the successor is head* - null, meaning there is no successor*/Node<E> next;Node(E x) {item = x;}
}

基础属性

/*** Pointer to first node.* Invariant: (first == null && last == null) ||*            (first.prev == null && first.item != null)*/
transient Node<E> first;
/*** Pointer to last node.* Invariant: (first == null && last == null) ||*            (last.next == null && last.item != null)*/
transient Node<E> last;
/** Number of items in the deque */
private transient int count;
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();

构造器

    /*** Creates a {@code LinkedBlockingDeque} with a capacity of* {@link Integer#MAX_VALUE}.*/public LinkedBlockingDeque() {this(Integer.MAX_VALUE);}/*** Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.** @param capacity the capacity of this deque* @throws IllegalArgumentException if {@code capacity} is less than 1*/public LinkedBlockingDeque(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;}/*** Creates a {@code LinkedBlockingDeque} with a capacity of* {@link Integer#MAX_VALUE}, initially containing the elements of* the given collection, added in traversal order of the* collection's iterator.** @param c the collection of elements to initially contain* @throws NullPointerException if the specified collection or any*         of its elements are null*/public LinkedBlockingDeque(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock lock = this.lock;lock.lock(); // Never contended, but necessary for visibilitytry {for (E e : c) {if (e == null)throw new NullPointerException();if (!linkLast(new Node<E>(e)))throw new IllegalStateException("Deque full");}} finally {lock.unlock();}}

吐槽

默认竟然构造成最大整数,真是令人费解?

初始化线程安全保证

使用了 ReentrantLock 可互斥锁,来保证线程安全性。

看的出来,如果有元素为空,会直接抛出异常。

添加元素

    /*** @throws IllegalStateException if this deque is full* @throws NullPointerException {@inheritDoc}*/public void addFirst(E e) {if (!offerFirst(e))throw new IllegalStateException("Deque full");}/*** @throws IllegalStateException if this deque is full* @throws NullPointerException  {@inheritDoc}*/public void addLast(E e) {if (!offerLast(e))throw new IllegalStateException("Deque full");}/*** @throws NullPointerException {@inheritDoc}*/public boolean offerFirst(E e) {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {return linkFirst(node);} finally {lock.unlock();}}/*** @throws NullPointerException {@inheritDoc}*/public boolean offerLast(E e) {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {return linkLast(node);} finally {lock.unlock();}}/*** @throws NullPointerException {@inheritDoc}* @throws InterruptedException {@inheritDoc}*/public void putFirst(E e) throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {while (!linkFirst(node))notFull.await();} finally {lock.unlock();}}/*** @throws NullPointerException {@inheritDoc}* @throws InterruptedException {@inheritDoc}*/public void putLast(E e) throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);final ReentrantLock lock = this.lock;lock.lock();try {while (!linkLast(node))notFull.await();} finally {lock.unlock();}}/*** @throws NullPointerException {@inheritDoc}* @throws InterruptedException {@inheritDoc}*/public boolean offerFirst(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (!linkFirst(node)) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}return true;} finally {lock.unlock();}}/*** @throws NullPointerException {@inheritDoc}* @throws InterruptedException {@inheritDoc}*/public boolean offerLast(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();Node<E> node = new Node<E>(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (!linkLast(node)) {if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}return true;} finally {lock.unlock();}}

linkFirst & linkLast

这里使用了 Condition 类来保证队列阻塞。

见 阻塞队列实现原理

    /*** Links node as first element, or returns false if full.*/private boolean linkFirst(Node<E> node) {// assert lock.isHeldByCurrentThread();if (count >= capacity)return false;Node<E> f = first;node.next = f;first = node;if (last == null)last = node;elsef.prev = node;++count;notEmpty.signal();return true;}/*** Links node as last element, or returns false if full.*/private boolean linkLast(Node<E> node) {// assert lock.isHeldByCurrentThread();if (count >= capacity)return false;Node<E> l = last;node.prev = l;last = node;if (first == null)first = node;elsel.next = node;++count;notEmpty.signal();return true;}

移除元素

    /*** @throws NoSuchElementException {@inheritDoc}*/public E removeFirst() {E x = pollFirst();if (x == null) throw new NoSuchElementException();return x;}/*** @throws NoSuchElementException {@inheritDoc}*/public E removeLast() {E x = pollLast();if (x == null) throw new NoSuchElementException();return x;}public E pollFirst() {final ReentrantLock lock = this.lock;lock.lock();try {return unlinkFirst();} finally {lock.unlock();}}public E pollLast() {final ReentrantLock lock = this.lock;lock.lock();try {return unlinkLast();} finally {lock.unlock();}}

unlinkLast & unlinkFirst

原理和 linkFirst 是类似的,仍然使用 Condition 保证阻塞。

    /*** Removes and returns first element, or null if empty.*/private E unlinkFirst() {// assert lock.isHeldByCurrentThread();Node<E> f = first;if (f == null)return null;Node<E> n = f.next;E item = f.item;f.item = null;f.next = f; // help GCfirst = n;if (n == null)last = null;elsen.prev = null;--count;notFull.signal();return item;}/*** Removes and returns last element, or null if empty.*/private E unlinkLast() {// assert lock.isHeldByCurrentThread();Node<E> l = last;if (l == null)return null;Node<E> p = l.prev;E item = l.item;l.item = null;l.prev = l; // help GClast = p;if (p == null)first = null;elsep.next = null;--count;notFull.signal();return item;}/*** Unlinks x.*/void unlink(Node<E> x) {// assert lock.isHeldByCurrentThread();Node<E> p = x.prev;Node<E> n = x.next;if (p == null) {unlinkFirst();} else if (n == null) {unlinkLast();} else {p.next = n;n.prev = p;x.item = null;// Don't mess with x's links.  They may still be in use by// an iterator.--count;notFull.signal();}}

获取元素

    public E takeFirst() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;while ( (x = unlinkFirst()) == null)notEmpty.await();return x;} finally {lock.unlock();}}public E takeLast() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lock();try {E x;while ( (x = unlinkLast()) == null)notEmpty.await();return x;} finally {lock.unlock();}}public E pollFirst(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {E x;while ( (x = unlinkFirst()) == null) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return x;} finally {lock.unlock();}}public E pollLast(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {E x;while ( (x = unlinkLast()) == null) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return x;} finally {lock.unlock();}}/*** @throws NoSuchElementException {@inheritDoc}*/public E getFirst() {E x = peekFirst();if (x == null) throw new NoSuchElementException();return x;}/*** @throws NoSuchElementException {@inheritDoc}*/public E getLast() {E x = peekLast();if (x == null) throw new NoSuchElementException();return x;}public E peekFirst() {final ReentrantLock lock = this.lock;lock.lock();try {return (first == null) ? null : first.item;} finally {lock.unlock();}}public E peekLast() {final ReentrantLock lock = this.lock;lock.lock();try {return (last == null) ? null : last.item;} finally {lock.unlock();}}

这些代码大同小异,都是使用 ReentrantLock 保证线程安全性。

使用 Condition 保证阻塞性。

个人启发

  1. 使用 ReentrantLock 保证线程安全性。可以说掌握这个就掌握了大部分的同步容器。

  2. 使用 Condition 保证阻塞性,掌握这个就掌握了大部分的阻塞队列容器。

  3. 所有的容器都有优缺点。比如双向队列,就有对应的并发容器。我们要学习原理,化为自己所用。

参考资料

https://blog.csdn.net/vernonzheng/article/details/8267541

https://blog.csdn.net/qq_38293564/article/details/80592429

LinkedBlockingDeque源码学习

目录

java多线程并发之旅-01-并发概览

这篇关于java多线程并发之旅-17--双端队列之 LinkedBlockingDeque的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/259372

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2