AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析

本文主要是介绍AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析

1、CountDownLatch 简介

CountDownLatch,是一个简单的同步器,它的含义是 允许一个或者多个线程等待其他线程的操作执行完毕后再执行后续的操作

CountDownLatch 的通常用法和 Thread.join() 有点类似,等待其他线程都完成后再执行主任务。

2、入门案例分析

案例1

  • 对于像我一样的学生来说,CountDownLatch 的实际开发应用很少,甚至有同学没有接触过它。但是在并发条件下,这个类的使用还是很常见的,所以先引入两个案例去了解下它的用途:
  • 借助 CountDownLatch ,控制主线程等待子线程完成再执行
/*** @author wcc* @date 2022/2/15 19:09*/
public class CountDownLatchTest01 {private static final int TASK_COUNT = 8;private static final int THREAD_CORE_SIZE = 10;public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(TASK_COUNT);Executor executor = Executors.newFixedThreadPool(10);for (int i = 0; i < TASK_COUNT; i++) {executor.execute(new WorkerRunnable(i, countDownLatch));}System.out.println("主线程正在等待所有子任务完成...");long mainWaitStartTimeMillis = System.currentTimeMillis();countDownLatch.await();long mainWaitEndTimeMillis = System.currentTimeMillis();System.out.println("主线程等待时长:"+ (mainWaitEndTimeMillis - mainWaitStartTimeMillis));}static class WorkerRunnable implements Runnable{private int taskId;private CountDownLatch latch;@Overridepublic void run() {doWorker();}public void doWorker(){System.out.println("任务ID:"+ taskId + ",任务正在进行中...");try {TimeUnit.MILLISECONDS.sleep(500);}catch (Exception e){e.printStackTrace();}finally {latch.countDown();}}public WorkerRunnable(int taskId, CountDownLatch latch) {this.taskId = taskId;this.latch = latch;}}
}

运行结果如下

H4fJ2T.png

案例2

  • 执行任务的线程,也可能是多对多的关系:本案例就来了解一下,借助 CountDownLatch,使得主线程控制子线程同时开启后,主线程再去阻塞等待子线程结束!
/*** @author wcc* @date 2022/2/15 19:09*/
public class CountDownLatchTest02 {public static void main(String[] args) throws InterruptedException {CountDownLatch startSignal = new CountDownLatch(1);CountDownLatch doneSignal = new CountDownLatch(10);for (int i = 0; i < 10; i++) {new Thread(new Worker(startSignal, doneSignal, i)).start();}// 这里让主线程休眠 500 毫秒,确保所有子线程已经启动,并且阻塞在 startSignal 栅栏处TimeUnit.MILLISECONDS.sleep(500);// 因为 startSignal 栅栏值为1,所以主线程只需要调用一次// 那么所有调用 startSignal.await() 阻塞的子线程,就都可以同时通过栅栏了System.out.println("子任务栅栏已经开始");startSignal.countDown();System.out.println("等待子任务结束...");long startTime = System.currentTimeMillis();// 等待所有子任务结束doneSignal.await();long endTime = System.currentTimeMillis();System.out.println("所有子任务已经结束,耗时:" + (endTime - startTime));}static class Worker implements Runnable{private final CountDownLatch startSignal;private final CountDownLatch doneSignal;private int id;@Overridepublic void run() {try {// 为了让所有线程同时开启任务,我们让所有线程先阻塞在这里// 等大家都准备好了,再打开这个门槛startSignal.await();System.out.println("子任务-" + id + ",开启时间:" + System.currentTimeMillis());doWorker();}catch (Exception e){e.printStackTrace();}finally {doneSignal.countDown();}}public void doWorker() throws InterruptedException{TimeUnit.SECONDS.sleep(5);}public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, int id) {this.startSignal = startSignal;this.doneSignal = doneSignal;this.id = id;}}
}

执行结果

H4f2se.png

上面代码中 startSignal.await();就相当于一个栅栏,把所有子线程都抵挡在他们的 run方法,等待主线程执行 startSignal.countDown();即关闭栅栏之后,所有子线程同时继续执行他们自己的 run() 方法,如下图:

H4fbQS.png

案例3

/*** @author wcc* @date 2022/2/16 14:14*/
public class CountDownLatchTest03 {public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(2);Thread t1 = new Thread(() -> {try {Thread.sleep(5000);}catch (Exception e){}// 休息 5 秒钟后(模拟工作线程工作了 5 秒),调用 countDown()latch.countDown();}, "t1");Thread t2 = new Thread(() -> {try {Thread.sleep(10000);}catch (Exception e){}// 休息 10 秒钟后(模拟工作线程工作了 10 秒),调用 countDown()latch.countDown();}, "t2");t1.start();t2.start();Thread t3 = new Thread(() -> {try {// 阻塞,等待 state 减为 0latch.await();System.out.println("线程 t3 从 await 中返回了");}catch (Exception e){System.out.println("线程 t3  await 被中断");Thread.currentThread().interrupt();}}, "t3");Thread t4 = new Thread(() -> {try {// 阻塞,等待 state 减为 0latch.await();System.out.println("线程 t4 从 await 中返回了");}catch (Exception e){System.out.println("线程 t4  await 被中断");Thread.currentThread().interrupt();}}, "t4");t3.start();t4.start();}
}

执行结果如下

H4fXZj.png

3、源码分析

3.1、Sync 内部类

private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;//传入初始count次数Sync(int count) {// 调用 setState()方法设置AQS中的 state 的值setState(count);}//获取还剩下的count次数int getCount() {return getState();}//尝试获取共享锁protected int tryAcquireShared(int acquires) {// 注意,这里state等于0的时候返回的是1// state不等于0的时候返回的是-1,也就是说state不等于0的时候总是要排队return (getState() == 0) ? 1 : -1;}/*** 尝试释放共享锁* 更新 AQS.state 的值,每调用一次,state 值减1,当 state - 1正好为0的时候,返回true*/protected boolean tryReleaseShared(int releases) {// 自旋操作,确保AQS.state 的值更新成功for (;;) {// 获取当前 state 的值int c = getState();// 条件成立:说明前面已经有线程触发唤醒操作了(已经释放共享锁,无法再释放了),这里返回falseif (c == 0)return false;// 执行到这里,说明 state > 0// 如果 c 的值 > 0,则将c值-1int nextc = c-1;// 原子更新 state CAS成功:说明当前线程执行 tryReleaseShared方法 c-1之前,没有其他线程修改过state的值//原子更新state的值:if (compareAndSetState(c, nextc))// nextc == 0:true:说明当前调用 countDown() 方法的线程就是需要触发唤醒操作的线程,此时会返回true进行唤醒操作return nextc == 0;}
}

Sync 内部类重写了 tryReleaseShared(int releases)tryAcquireShared(int acquires) 方法,并把 count 存到 state 变量中去。这里要注意一下,上面两个方法的参数并没有被用到。

3.2、构造方法

// 构造方法需要传入一个 count,也就是初始次数
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}

3.3、await() 方法

await() 方法是等待其他线程完成的方法,它会先尝试获取一下共享锁,如果失败则进入 AQS 的阻塞队列中排队等待被唤醒。

根据上面的 Sync 的源码,我们知道,state 不等于 0 的时候 tryAcquireShared() 返回的是 -1,也就是说 count 未减到 0 的时候,所有调用 await() 方法的线程都要排队。

public void await() throws InterruptedException {// 调用 AQS 的acquireSharedInterruptibly() 方法sync.acquireSharedInterruptibly(1);
}

AQS 中的 acquireSharedInterruptibly 方法:

// 位于AQS 中:可以响应中断获取共享锁的方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 条件成立:说明当前调用 await 方法的线程已经是中断状态了,直接抛出异常if (Thread.interrupted())throw new InterruptedException();// 条件成立:说明当前 AQS 的 state 是大于0的,此时将线程入队,然后走唤醒操作// 条件不成立: AQS.state == 0,此时就不会阻塞线程了...// 对应业务层面执行任务的线程已经将latch打破了,然后其他在调用latch.await的线程就不会在这里阻塞了if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 将调用 latch.await() 方法的线程包装成 node 加入到AQS 的阻塞队列中final Node node = addWaiter(Node.SHARED);// false:表示当前线程没有被中断,未抛出中断异常,不需要进行响应中断出队的逻辑// true:表示当前线程被中断了,且抛出中断异常,需要进行取消指定node参与竞争的逻辑boolean failed = true;try {// 自旋操作for (;;) {// 获取当前节点的前驱节点final Node p = node.predecessor();// 条件成立:说明当前线程对应的节点为 head.next 节点if (p == head) {// head.next 节点就有权力获取共享锁了int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// shouldParkAfterFailedAcquire():会给当前线程找一个好爸爸,最终给爸爸节点设置状态为 -1(SIGNAL),最终这个方法返回 trueif (shouldParkAfterFailedAcquire(p, node) &&// parkAndCheckInterrupt():挂起当前线程,并返回当前线程的中断标记parkAndCheckInterrupt())throw new InterruptedException();}} finally {// 条件成立:说明当前线程发生了中断,需要进行取消指定当前node线程参与竞争if (failed)cancelAcquire(node);}}/*** AQS 的setHeadAndPropagate方法 设置当前节点为head节点,并且向后传播(依次唤醒)* @param node* @param propagate 1:代表当前共享锁的state==0,-1:代表当前共享锁状态 state != 0*/private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below// 设置当前节点为新的head节点,并设置thread、prev为nullsetHead(node);// 调用 setHeadAndPropagate的时候 propagate == 1 一定成立if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 获取当前节点的后继节点Node s = node.next;// 条件一:s== null 什么时候成立:当前 node 节点已经是 tail了,这时候条件一会成立,调用 doReleaseShared会处理这种情况// 条件二:前置条件:s != null 要求后继节点s的模式为共享模式 SHAREDif (s == null || s.isShared())// 基本上所有情况都会执行到 doReleaseShared 方法doReleaseShared();}}
}

图解分析

H45EP1.png

3.4、countDown() 方法

countDown() 方法,会释放共享锁,也就是 count 的次数会减1.

根据上面 Sync 的源码,我们知道,tryReleaseShared() 每次会把 count 的次数减1,当其减为0的时候返回 true,这时候才唤醒等待的线程。

注意,doReleaseShared() 是唤醒等待的线程,这个方法我们在前面的章节中分析过了。

public void countDown() {sync.releaseShared(1);}// 释放共享锁的方法public final boolean releaseShared(int arg) {// 条件成立:说明当前调用 latch.countDown() 方法的线程,正好是 state - 1 == 0的线程,需要触发唤醒 await状态的线程if (tryReleaseShared(arg)) {// 调用countDown()方法的线程只有一个线程会进入到这个 if 块里面,调用 doReleaseShared(),唤醒阻塞状态线程的逻辑doReleaseShared();return true;}return false;}/*** 都有哪几种路径会调用到 doReleaseShared方法呢?* 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 唤醒当前阻塞队列内head.next对应的线程* 2.被唤醒的线程 -> doAcquireSharedInterruptibly() -> setHeadAndPropagate() -> doReleaseShared()*/// AQS 的 doReleaseShared() 方法private void doReleaseShared() {for (;;) {// 获取当前 AQS 内的头结点Node h = head;// 条件一:h != null 成立:说明阻塞队列不为空// 不成立:h == null,什么时候会是这样呢?// latch 创建出来后,没有任何线程调用过 await()方法之前,有线程调用 latch.countDown()操作,且触发了唤醒阻塞节点的逻辑// 条件二:h != tail,当前在阻塞队列内除了head节点以外还有其他节点// h == tail -> head 和 tail 指向的是同一个node对象 什么时候会有这种情况呢?// 1.正常唤醒情况,依次获取到共享锁,当前线程执行到这里的时候(线程就是tail节点)// 2.第一个调用 await() 方法的线程与调用countDown且触发唤醒阻塞节点的线程出现并发了//  因为await()线程是第一个调用 latch.await() 的线程,此时队列内什么也没有,它需要补充创建一个head节点,然后再次自旋入队//  在await()线程入队完成之前,假设当前队列内只有刚刚补充创建的空元素head//  同一时期,外部有一个调用 countDown() 的线程,它将state的值从1修改为0了,这个线程需要做唤醒阻塞队列内元素的逻辑// 注意:调用 await() 方法的线程,因为完全入队完成之后再次回到上层方法doAcquireSharedInterruptibly中,会进入到自旋中...// 自旋中会获取当前元素的前驱,判断自己是head.next,所有接下来改线程又会将自己设置为head,然后并没有把当前线程中断if (h != null && h != tail) {// 执行到这个if里面,说明当前head一定有后继节点// 获取头结点head的等待状态int ws = h.waitStatus;// 如果当前head节点状态为 signal 说明当前后继节点并没有被唤醒过呢if (ws == Node.SIGNAL) {// 唤醒后继节点前,将当前head节点的状态改为0// 这里,为什么使用CAS 操作呢?// 这里,是因为当前节点唤醒后继节点的时候,后继节点更新了自己为head节点,导致当前节点无法退出自旋,然后会再次参与到唤醒其后继节点的后继节点的逻辑中// 所以,此时是有并发的,要使用CAS逻辑if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases//唤醒后继节点unparkSuccessor(h);}//执行到这里,说明当前头结点的等待状态不是SIGNALelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 条件成立:// 1.说明刚刚唤醒的后继节点还没有执行到setHeadAndPropagate方法里面的,设置当前唤醒节点为head的逻辑// 这个时候,当前线程直接跳出去就结束了...// 此时,用不用担心唤醒逻辑在这里断掉呢?// 不需要担心,因为被唤醒的线程早晚会执行到 doReleaseShared方法中// 2.head == null//  latch 创建出来后,没有任何线程调用过 await()方法之前,有线程调用 latch.countDown()操作,且触发了唤醒阻塞节点的逻辑// 3.head == tail 第一个调用 await() 方法的线程与调用countDown且触发唤醒阻塞节点的线程出现并发了 head 和 tail 指向的是同一个对象// 条件不成立:// 被唤醒的节点非常积极,然后直接在上层方法doAcquireSharedInterruptibly中被被唤醒,直接将自己设置为了新的head节点// 此时唤醒它的节点(前驱节点)执行 h == head 导致条件不成立// 此时 head 节点的前驱不会跳出 doReleaseShared,会继续参与唤醒新head节点的后继节点逻辑中去if (h == head)                   // loop if head changedbreak;}}/*** 尝试释放共享锁* 更新 AQS.state 的值,每调用一次,state 值减1,当 state - 1正好为0的时候,返回true*/protected boolean tryReleaseShared(int releases) {// 自旋操作,确保AQS.state 的值更新成功for (;;) {// 获取当前 state 的值int c = getState();// 条件成立:说明前面已经有线程触发唤醒操作了(已经释放共享锁,无法再释放了),这里返回falseif (c == 0)return false;// 执行到这里,说明 state > 0// 如果 c 的值 > 0,则将c值-1int nextc = c-1;// 原子更新 state CAS成功:说明当前线程执行 tryReleaseShared方法 c-1之前,没有其他线程修改过state的值//原子更新state的值:if (compareAndSetState(c, nextc))// nextc == 0:true:说明当前调用 countDown() 方法的线程就是需要触发唤醒操作的线程,此时会返回true进行唤醒操作return nextc == 0;}}

CountDowmnLatch.countDown() 执行流程图解:

H45oJ1.png
总结

  • CountDownLatch 表示允许一个或者多个线程等待其他线程的操作执行完成后再执行后续的操作
  • CountDownLatch 使用 AQS 的共享锁机制实现
  • CountDownLatch 初始化的时候需要传入次数 count(共享锁的锁的层数)
  • 每次调用 countDown() 方法的时候 count 的次数减1
  • 每次调用 await() 方法的时候都会尝试获取锁,这里的获取锁其实是检查 AQS 中的 state 值是否为0
  • 当 count 值(也就是 state 的值)减为0的时候会唤醒 AQS 中阻塞队列中的线程, 这些线程调用 await() 方法入队

这篇关于AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/423425

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud

线性因子模型 - 独立分量分析(ICA)篇

序言 线性因子模型是数据分析与机器学习中的一类重要模型,它们通过引入潜变量( latent variables \text{latent variables} latent variables)来更好地表征数据。其中,独立分量分析( ICA \text{ICA} ICA)作为线性因子模型的一种,以其独特的视角和广泛的应用领域而备受关注。 ICA \text{ICA} ICA旨在将观察到的复杂信号

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。