AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析

本文主要是介绍AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析

1、CountDownLatch 简介

CountDownLatch,是一个简单的同步器,它的含义是 允许一个或者多个线程等待其他线程的操作执行完毕后再执行后续的操作

CountDownLatch 的通常用法和 Thread.join() 有点类似,等待其他线程都完成后再执行主任务。

2、入门案例分析

案例1

  • 对于像我一样的学生来说,CountDownLatch 的实际开发应用很少,甚至有同学没有接触过它。但是在并发条件下,这个类的使用还是很常见的,所以先引入两个案例去了解下它的用途:
  • 借助 CountDownLatch ,控制主线程等待子线程完成再执行
/*** @author wcc* @date 2022/2/15 19:09*/
public class CountDownLatchTest01 {private static final int TASK_COUNT = 8;private static final int THREAD_CORE_SIZE = 10;public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(TASK_COUNT);Executor executor = Executors.newFixedThreadPool(10);for (int i = 0; i < TASK_COUNT; i++) {executor.execute(new WorkerRunnable(i, countDownLatch));}System.out.println("主线程正在等待所有子任务完成...");long mainWaitStartTimeMillis = System.currentTimeMillis();countDownLatch.await();long mainWaitEndTimeMillis = System.currentTimeMillis();System.out.println("主线程等待时长:"+ (mainWaitEndTimeMillis - mainWaitStartTimeMillis));}static class WorkerRunnable implements Runnable{private int taskId;private CountDownLatch latch;@Overridepublic void run() {doWorker();}public void doWorker(){System.out.println("任务ID:"+ taskId + ",任务正在进行中...");try {TimeUnit.MILLISECONDS.sleep(500);}catch (Exception e){e.printStackTrace();}finally {latch.countDown();}}public WorkerRunnable(int taskId, CountDownLatch latch) {this.taskId = taskId;this.latch = latch;}}
}

运行结果如下

H4fJ2T.png

案例2

  • 执行任务的线程,也可能是多对多的关系:本案例就来了解一下,借助 CountDownLatch,使得主线程控制子线程同时开启后,主线程再去阻塞等待子线程结束!
/*** @author wcc* @date 2022/2/15 19:09*/
public class CountDownLatchTest02 {public static void main(String[] args) throws InterruptedException {CountDownLatch startSignal = new CountDownLatch(1);CountDownLatch doneSignal = new CountDownLatch(10);for (int i = 0; i < 10; i++) {new Thread(new Worker(startSignal, doneSignal, i)).start();}// 这里让主线程休眠 500 毫秒,确保所有子线程已经启动,并且阻塞在 startSignal 栅栏处TimeUnit.MILLISECONDS.sleep(500);// 因为 startSignal 栅栏值为1,所以主线程只需要调用一次// 那么所有调用 startSignal.await() 阻塞的子线程,就都可以同时通过栅栏了System.out.println("子任务栅栏已经开始");startSignal.countDown();System.out.println("等待子任务结束...");long startTime = System.currentTimeMillis();// 等待所有子任务结束doneSignal.await();long endTime = System.currentTimeMillis();System.out.println("所有子任务已经结束,耗时:" + (endTime - startTime));}static class Worker implements Runnable{private final CountDownLatch startSignal;private final CountDownLatch doneSignal;private int id;@Overridepublic void run() {try {// 为了让所有线程同时开启任务,我们让所有线程先阻塞在这里// 等大家都准备好了,再打开这个门槛startSignal.await();System.out.println("子任务-" + id + ",开启时间:" + System.currentTimeMillis());doWorker();}catch (Exception e){e.printStackTrace();}finally {doneSignal.countDown();}}public void doWorker() throws InterruptedException{TimeUnit.SECONDS.sleep(5);}public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, int id) {this.startSignal = startSignal;this.doneSignal = doneSignal;this.id = id;}}
}

执行结果

H4f2se.png

上面代码中 startSignal.await();就相当于一个栅栏,把所有子线程都抵挡在他们的 run方法,等待主线程执行 startSignal.countDown();即关闭栅栏之后,所有子线程同时继续执行他们自己的 run() 方法,如下图:

H4fbQS.png

案例3

/*** @author wcc* @date 2022/2/16 14:14*/
public class CountDownLatchTest03 {public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(2);Thread t1 = new Thread(() -> {try {Thread.sleep(5000);}catch (Exception e){}// 休息 5 秒钟后(模拟工作线程工作了 5 秒),调用 countDown()latch.countDown();}, "t1");Thread t2 = new Thread(() -> {try {Thread.sleep(10000);}catch (Exception e){}// 休息 10 秒钟后(模拟工作线程工作了 10 秒),调用 countDown()latch.countDown();}, "t2");t1.start();t2.start();Thread t3 = new Thread(() -> {try {// 阻塞,等待 state 减为 0latch.await();System.out.println("线程 t3 从 await 中返回了");}catch (Exception e){System.out.println("线程 t3  await 被中断");Thread.currentThread().interrupt();}}, "t3");Thread t4 = new Thread(() -> {try {// 阻塞,等待 state 减为 0latch.await();System.out.println("线程 t4 从 await 中返回了");}catch (Exception e){System.out.println("线程 t4  await 被中断");Thread.currentThread().interrupt();}}, "t4");t3.start();t4.start();}
}

执行结果如下

H4fXZj.png

3、源码分析

3.1、Sync 内部类

private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;//传入初始count次数Sync(int count) {// 调用 setState()方法设置AQS中的 state 的值setState(count);}//获取还剩下的count次数int getCount() {return getState();}//尝试获取共享锁protected int tryAcquireShared(int acquires) {// 注意,这里state等于0的时候返回的是1// state不等于0的时候返回的是-1,也就是说state不等于0的时候总是要排队return (getState() == 0) ? 1 : -1;}/*** 尝试释放共享锁* 更新 AQS.state 的值,每调用一次,state 值减1,当 state - 1正好为0的时候,返回true*/protected boolean tryReleaseShared(int releases) {// 自旋操作,确保AQS.state 的值更新成功for (;;) {// 获取当前 state 的值int c = getState();// 条件成立:说明前面已经有线程触发唤醒操作了(已经释放共享锁,无法再释放了),这里返回falseif (c == 0)return false;// 执行到这里,说明 state > 0// 如果 c 的值 > 0,则将c值-1int nextc = c-1;// 原子更新 state CAS成功:说明当前线程执行 tryReleaseShared方法 c-1之前,没有其他线程修改过state的值//原子更新state的值:if (compareAndSetState(c, nextc))// nextc == 0:true:说明当前调用 countDown() 方法的线程就是需要触发唤醒操作的线程,此时会返回true进行唤醒操作return nextc == 0;}
}

Sync 内部类重写了 tryReleaseShared(int releases)tryAcquireShared(int acquires) 方法,并把 count 存到 state 变量中去。这里要注意一下,上面两个方法的参数并没有被用到。

3.2、构造方法

// 构造方法需要传入一个 count,也就是初始次数
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}

3.3、await() 方法

await() 方法是等待其他线程完成的方法,它会先尝试获取一下共享锁,如果失败则进入 AQS 的阻塞队列中排队等待被唤醒。

根据上面的 Sync 的源码,我们知道,state 不等于 0 的时候 tryAcquireShared() 返回的是 -1,也就是说 count 未减到 0 的时候,所有调用 await() 方法的线程都要排队。

public void await() throws InterruptedException {// 调用 AQS 的acquireSharedInterruptibly() 方法sync.acquireSharedInterruptibly(1);
}

AQS 中的 acquireSharedInterruptibly 方法:

// 位于AQS 中:可以响应中断获取共享锁的方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 条件成立:说明当前调用 await 方法的线程已经是中断状态了,直接抛出异常if (Thread.interrupted())throw new InterruptedException();// 条件成立:说明当前 AQS 的 state 是大于0的,此时将线程入队,然后走唤醒操作// 条件不成立: AQS.state == 0,此时就不会阻塞线程了...// 对应业务层面执行任务的线程已经将latch打破了,然后其他在调用latch.await的线程就不会在这里阻塞了if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 将调用 latch.await() 方法的线程包装成 node 加入到AQS 的阻塞队列中final Node node = addWaiter(Node.SHARED);// false:表示当前线程没有被中断,未抛出中断异常,不需要进行响应中断出队的逻辑// true:表示当前线程被中断了,且抛出中断异常,需要进行取消指定node参与竞争的逻辑boolean failed = true;try {// 自旋操作for (;;) {// 获取当前节点的前驱节点final Node p = node.predecessor();// 条件成立:说明当前线程对应的节点为 head.next 节点if (p == head) {// head.next 节点就有权力获取共享锁了int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// shouldParkAfterFailedAcquire():会给当前线程找一个好爸爸,最终给爸爸节点设置状态为 -1(SIGNAL),最终这个方法返回 trueif (shouldParkAfterFailedAcquire(p, node) &&// parkAndCheckInterrupt():挂起当前线程,并返回当前线程的中断标记parkAndCheckInterrupt())throw new InterruptedException();}} finally {// 条件成立:说明当前线程发生了中断,需要进行取消指定当前node线程参与竞争if (failed)cancelAcquire(node);}}/*** AQS 的setHeadAndPropagate方法 设置当前节点为head节点,并且向后传播(依次唤醒)* @param node* @param propagate 1:代表当前共享锁的state==0,-1:代表当前共享锁状态 state != 0*/private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check below// 设置当前节点为新的head节点,并设置thread、prev为nullsetHead(node);// 调用 setHeadAndPropagate的时候 propagate == 1 一定成立if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 获取当前节点的后继节点Node s = node.next;// 条件一:s== null 什么时候成立:当前 node 节点已经是 tail了,这时候条件一会成立,调用 doReleaseShared会处理这种情况// 条件二:前置条件:s != null 要求后继节点s的模式为共享模式 SHAREDif (s == null || s.isShared())// 基本上所有情况都会执行到 doReleaseShared 方法doReleaseShared();}}
}

图解分析

H45EP1.png

3.4、countDown() 方法

countDown() 方法,会释放共享锁,也就是 count 的次数会减1.

根据上面 Sync 的源码,我们知道,tryReleaseShared() 每次会把 count 的次数减1,当其减为0的时候返回 true,这时候才唤醒等待的线程。

注意,doReleaseShared() 是唤醒等待的线程,这个方法我们在前面的章节中分析过了。

public void countDown() {sync.releaseShared(1);}// 释放共享锁的方法public final boolean releaseShared(int arg) {// 条件成立:说明当前调用 latch.countDown() 方法的线程,正好是 state - 1 == 0的线程,需要触发唤醒 await状态的线程if (tryReleaseShared(arg)) {// 调用countDown()方法的线程只有一个线程会进入到这个 if 块里面,调用 doReleaseShared(),唤醒阻塞状态线程的逻辑doReleaseShared();return true;}return false;}/*** 都有哪几种路径会调用到 doReleaseShared方法呢?* 1.latch.countDown() -> AQS.state == 0 -> doReleaseShared() 唤醒当前阻塞队列内head.next对应的线程* 2.被唤醒的线程 -> doAcquireSharedInterruptibly() -> setHeadAndPropagate() -> doReleaseShared()*/// AQS 的 doReleaseShared() 方法private void doReleaseShared() {for (;;) {// 获取当前 AQS 内的头结点Node h = head;// 条件一:h != null 成立:说明阻塞队列不为空// 不成立:h == null,什么时候会是这样呢?// latch 创建出来后,没有任何线程调用过 await()方法之前,有线程调用 latch.countDown()操作,且触发了唤醒阻塞节点的逻辑// 条件二:h != tail,当前在阻塞队列内除了head节点以外还有其他节点// h == tail -> head 和 tail 指向的是同一个node对象 什么时候会有这种情况呢?// 1.正常唤醒情况,依次获取到共享锁,当前线程执行到这里的时候(线程就是tail节点)// 2.第一个调用 await() 方法的线程与调用countDown且触发唤醒阻塞节点的线程出现并发了//  因为await()线程是第一个调用 latch.await() 的线程,此时队列内什么也没有,它需要补充创建一个head节点,然后再次自旋入队//  在await()线程入队完成之前,假设当前队列内只有刚刚补充创建的空元素head//  同一时期,外部有一个调用 countDown() 的线程,它将state的值从1修改为0了,这个线程需要做唤醒阻塞队列内元素的逻辑// 注意:调用 await() 方法的线程,因为完全入队完成之后再次回到上层方法doAcquireSharedInterruptibly中,会进入到自旋中...// 自旋中会获取当前元素的前驱,判断自己是head.next,所有接下来改线程又会将自己设置为head,然后并没有把当前线程中断if (h != null && h != tail) {// 执行到这个if里面,说明当前head一定有后继节点// 获取头结点head的等待状态int ws = h.waitStatus;// 如果当前head节点状态为 signal 说明当前后继节点并没有被唤醒过呢if (ws == Node.SIGNAL) {// 唤醒后继节点前,将当前head节点的状态改为0// 这里,为什么使用CAS 操作呢?// 这里,是因为当前节点唤醒后继节点的时候,后继节点更新了自己为head节点,导致当前节点无法退出自旋,然后会再次参与到唤醒其后继节点的后继节点的逻辑中// 所以,此时是有并发的,要使用CAS逻辑if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases//唤醒后继节点unparkSuccessor(h);}//执行到这里,说明当前头结点的等待状态不是SIGNALelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 条件成立:// 1.说明刚刚唤醒的后继节点还没有执行到setHeadAndPropagate方法里面的,设置当前唤醒节点为head的逻辑// 这个时候,当前线程直接跳出去就结束了...// 此时,用不用担心唤醒逻辑在这里断掉呢?// 不需要担心,因为被唤醒的线程早晚会执行到 doReleaseShared方法中// 2.head == null//  latch 创建出来后,没有任何线程调用过 await()方法之前,有线程调用 latch.countDown()操作,且触发了唤醒阻塞节点的逻辑// 3.head == tail 第一个调用 await() 方法的线程与调用countDown且触发唤醒阻塞节点的线程出现并发了 head 和 tail 指向的是同一个对象// 条件不成立:// 被唤醒的节点非常积极,然后直接在上层方法doAcquireSharedInterruptibly中被被唤醒,直接将自己设置为了新的head节点// 此时唤醒它的节点(前驱节点)执行 h == head 导致条件不成立// 此时 head 节点的前驱不会跳出 doReleaseShared,会继续参与唤醒新head节点的后继节点逻辑中去if (h == head)                   // loop if head changedbreak;}}/*** 尝试释放共享锁* 更新 AQS.state 的值,每调用一次,state 值减1,当 state - 1正好为0的时候,返回true*/protected boolean tryReleaseShared(int releases) {// 自旋操作,确保AQS.state 的值更新成功for (;;) {// 获取当前 state 的值int c = getState();// 条件成立:说明前面已经有线程触发唤醒操作了(已经释放共享锁,无法再释放了),这里返回falseif (c == 0)return false;// 执行到这里,说明 state > 0// 如果 c 的值 > 0,则将c值-1int nextc = c-1;// 原子更新 state CAS成功:说明当前线程执行 tryReleaseShared方法 c-1之前,没有其他线程修改过state的值//原子更新state的值:if (compareAndSetState(c, nextc))// nextc == 0:true:说明当前调用 countDown() 方法的线程就是需要触发唤醒操作的线程,此时会返回true进行唤醒操作return nextc == 0;}}

CountDowmnLatch.countDown() 执行流程图解:

H45oJ1.png
总结

  • CountDownLatch 表示允许一个或者多个线程等待其他线程的操作执行完成后再执行后续的操作
  • CountDownLatch 使用 AQS 的共享锁机制实现
  • CountDownLatch 初始化的时候需要传入次数 count(共享锁的锁的层数)
  • 每次调用 countDown() 方法的时候 count 的次数减1
  • 每次调用 await() 方法的时候都会尝试获取锁,这里的获取锁其实是检查 AQS 中的 state 值是否为0
  • 当 count 值(也就是 state 的值)减为0的时候会唤醒 AQS 中阻塞队列中的线程, 这些线程调用 await() 方法入队

这篇关于AbstractQueuedSynchronizer(AQS) 源码细致分析 - CountDownLatch 源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/423425

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专