LinkedBlockingQueue源码学习

2024-04-23 15:18

本文主要是介绍LinkedBlockingQueue源码学习,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

首先来看一个例子,例子来源于网上:

/*** 多线程模拟实现生产者/消费者模型*  */
public class BlockingQueueTest2 {/*** * 定义装苹果的篮子* */public class Basket {// 篮子,能够容纳3个苹果BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);// 生产苹果,放入篮子public void produce() throws InterruptedException {// put方法放入一个苹果,若basket满了,等到basket有位置basket.put("An apple");}// 消费苹果,从篮子中取走public String consume() throws InterruptedException {// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)return basket.take();}}// 定义苹果生产者class Producer implements Runnable {private String instance;private Basket basket;public Producer(String instance, Basket basket) {this.instance = instance;this.basket = basket;}public void run() {try {while (true) {// 生产苹果System.out.println("生产者准备生产苹果:" + instance);basket.produce();System.out.println("!生产者生产苹果完毕:" + instance);// 休眠300msThread.sleep(300);}} catch (InterruptedException ex) {System.out.println("Producer Interrupted");}}}// 定义苹果消费者class Consumer implements Runnable {private String instance;private Basket basket;public Consumer(String instance, Basket basket) {this.instance = instance;this.basket = basket;}public void run() {try {while (true) {// 消费苹果System.out.println("消费者准备消费苹果:" + instance);System.out.println(basket.consume());System.out.println("!消费者消费苹果完毕:" + instance);// 休眠1000msThread.sleep(1000);}} catch (InterruptedException ex) {System.out.println("Consumer Interrupted");}}}public static void main(String[] args) {BlockingQueueTest2 test = new BlockingQueueTest2();// 建立一个装苹果的篮子Basket basket = test.new Basket();ExecutorService service = Executors.newCachedThreadPool();Producer producer = test.new Producer("生产者001", basket);Producer producer2 = test.new Producer("生产者002", basket);Consumer consumer = test.new Consumer("消费者001", basket);service.submit(producer);service.submit(producer2);service.submit(consumer);// 程序运行5s后,所有任务停止
//        try {
//            Thread.sleep(1000 * 5);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
//        service.shutdownNow();}}

采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:

采用链表数据结构Node的方式进行节点数据的记录,

同时其进行入队和出队的计数器采用原子性的AtomicInteger

其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock

其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消费消息进行锁的分离。

1.相关变量

//容量,为空时使用Integer.MAX_VALUE=2^31-1
private final int capacity;/** Current number of elements */
//计数,队列中的元素个数
private final AtomicInteger count = new AtomicInteger();//头结点,head.item==null,首节点不存放元素
transient Node<E> head;//尾节点,last.next==null
private transient Node<E> last;/** Lock held by take, poll, etc */
//消费队列锁
private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */
//消费队列等待消费,用于队满时,进行消费
private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */
//生产队列锁
private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */
//生产队列等待生产,用于队空时,进行生产
private final Condition notFull = putLock.newCondition();//节点信息:数据、后继点击
static class Node<E> {E item;/*** One of:* - the real successor Node* - this Node, meaning the successor is head.next* - null, meaning there is no successor (this is the last node)*///下一个节点,分为三种情况:// 指向真正的节点、指向自己,后继节点为head.next、为空,表示当前节点为尾节点Node<E> next;Node(E x) { item = x; }
}

2.构造方法

//构造方法,空参构造默认队列容量为2^31-1
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}//构造方法,带指定容量
public LinkedBlockingQueue(int capacity) {//对容量进行校验if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;//创建节点信息last = head = new Node<E>(null);
}//构造方法,放入带指定集合的元素信息入队
//首先采用默认大小,进行上锁操作,
// 放入元素到队列中,进行遍历,放入
public LinkedBlockingQueue(Collection<? extends E> c) {//默认队列大小,2^31-1this(Integer.MAX_VALUE);//进行上锁操作final ReentrantLock putLock = this.putLock;putLock.lock(); // Never contended, but necessary for visibilitytry {//放入元素,进行计数int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {//释放锁putLock.unlock();}
}

3.方法

生产方法

put

//入队操作
//首先获取锁,再检查队列是否满了,如果满了,则进行阻塞等待,
// 如果队列没有满,则进行生产操作,同时计数器进行计数
//生产后的元素个数如果还没有达到容量时,会继续唤醒其他生产线程
//当生产的元素是元素的第一个元素时唤醒阻塞等待消费的线程
public void put(E e) throws InterruptedException {//非空校验if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.//设置计数为0,失败的时候返回int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//中断上锁putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*///检查队列是否满了,满了进行阻塞操作while (count.get() == capacity) {notFull.await();}//入队操作,将节点信息插入到队尾//last=last.next=nodeenqueue(node);c = count.getAndIncrement();//元素没有满,则唤醒被阻塞的线程,增加线程if (c + 1 < capacity)notFull.signal();} finally {//释放锁putLock.unlock();}//插入的是一个元素时唤醒阻塞等待的线程if (c == 0)signalNotEmpty();
}

offer

//阻塞带超时时间的offer操作
public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {//如果时间<0,则表示超时返回了,此时队列未满,直接返回if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}//否者进行入队操作enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;
}//首先进行非空校验,如果队满了,直接返回false
//如果没有满,则进行上锁,同时进行判断,
// 如果计数<容量,则进行入队操作
//最后释放锁
public boolean offer(E e) {if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;putLock.lock();try {if (count.get() < capacity) {enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;
}

消费者操作

take操作

//take操作 消费消息
//如果队列为非空或者被唤醒,进行消费操作,计数器-1
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}//开始消费if (c == capacity)signalNotFull();return x;
}
//进行消费操作 poll,带超时时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;
}//进行poll操作public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

remove操作:

//删除操作,释放指定节点信息
public boolean remove(Object o) {if (o == null) return false;//对生产消息和消费消息进行上锁fullyLock();try {for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {//释放节点unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}
}

drainTo操作

drainTo操作
public int drainTo(Collection<? super E> c) {return drainTo(c, Integer.MAX_VALUE);}//一次性地将队列中的全部元素消费完同时返回指定集合的信息,避免多次加锁造成的性能开销//其中c和maxElement表示返回的集合、要获取的元素个数public int drainTo(Collection<? super E> c, int maxElements) {if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();if (maxElements <= 0)return 0;boolean signalNotFull = false;//进行上锁final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {//拿到两者之间的最小的一个int n = Math.min(maxElements, count.get());// count.get provides visibility to first n NodesNode<E> h = head;int i = 0;try {//将元素添加中集合c中while (i < n) {Node<E> p = h.next;c.add(p.item);p.item = null;h.next = h;h = p;++i;}return n;} finally {// Restore invariants even if c.add() threwif (i > 0) {// assert h.item == null;head = h;signalNotFull = (count.getAndAdd(-i) == capacity);}}} finally {takeLock.unlock();if (signalNotFull)signalNotFull();}}

这篇关于LinkedBlockingQueue源码学习的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/929150

相关文章

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL