多线程-AQS浅析

2024-05-12 15:48
文章标签 多线程 浅析 aqs

本文主要是介绍多线程-AQS浅析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1. AQS简介

2、AQS常用方法

  2.1 关于state的方法主要有一下三种

  2.2 自定义同步器实现时主要实现以下几种方法

  2.3 其余方法

3、CLH

3.1 CLH入列

3.2 CLH出列

4. 同步状态的获取与释放

4.1 获取

4.1.1 tryAcquire(int)

4.1.2 acquireQueued

4.2 释放


 

1. AQS简介

  • AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等)。
JUC包下的核心基础组件。也是实现大部分同步需求的基础。学习该组件是学习JUC绕不开的一块内容。
  • AQS解决了子类实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了,所以使用AQS不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
  • 在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。
  • AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
  • AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
  • AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
#同步队列
同步队列是AQS很重要的组成部分,它是一个双端队列,遵循FIFO原则,主要作用是用来存放在锁上阻塞的线程,
当一个线程尝试获取锁时,如果已经被占用,那么当前线程就会被构造成一个Node节点假如到同步队列的尾部,队
列的头节点是成功获取锁的节点,当头节点线程释放锁时,会唤醒后面的节点并释放当前头节点的引用。

2、AQS常用方法

  2.1 关于state的方法主要有一下三种

  • getState():返回同步状态的当前值;

  • setState(int newState):设置当前同步状态;

  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;

  2.2 自定义同步器实现时主要实现以下几种方法

  • tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态

  • tryRelease(int arg):独占式释放同步状态;

  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;

  • tryReleaseShared(int arg):共享式释放同步状态;

  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;

  2.3 其余方法

  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;

  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;

  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;

  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;

  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;

  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;

  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;

  • releaseShared(int arg):共享式释放同步状态;

3、CLH

CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其数据结构如下

   其实就是个双端双向链表

   源码定义如下

	/*** AQS会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列*/static final class Node {/** 表明节点正在等待共享模式的标记 */static final Node SHARED = new Node();/** 表明节点正在等待独占模式的标记 */static final Node EXCLUSIVE = null;/** waitStatus值表明线程已取消 */static final int CANCELLED = 1;/** waitStatus值表示后续线程需要运行 */static final int SIGNAL = -1;/** waitStatus值表示当前节点在等待condition,也就是在condition队列中*//** 节点在等待队列中,节点线程在等待Condition,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步 */static final int CONDITION = -2;/*** waitStatus值指示下一个acquireShared应无条件地传播*/static final int PROPAGATE = -3;/*** 等待状态,仅接受以下值: * SIGNAL:* 		该节点的后继者(或将很快被)阻止(通过park),因此当前节点在释放或取消时必须取消其继承者* 		为了避免竞争,获取方法必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时阻塞. * CANCELLED:* 		该节点由于超时或中断而被取消。 节点永远不会离开这个状态。特别地,具有被取消节点的线程再也不会被阻塞. * CONDITION:* 		此节点当前处于条件队列. 在传输之前,它不会用作同步队列节点,此时状态将设置为0.                 * PROPAGATE:*     releaseShared应该传播到其他节点.*     在doReleaseShared中设置(仅适用于头节点),以确保传播继续,即使其他操作已经干预.** 这些值以数字排列以简化使用. 非负值意味着节点不需要信号。所以,大多数代码不需要检查特定的值,只是为了符号。** 对于一般同步节点,该字段被初始化为0,条件节点的CONDITION被初始化 它使用CAS(或可能的话,无条件的易失性写入)进行修改。* CANCELLED 取消状态* SIGNAL 等待触发状态,前节点可能是head或者前节点为取消状态CANCELLED* CONDITION 等待条件状态,在等待队列中* PROPAGATE 状态需要向后传播*/volatile int waitStatus;/*** 前驱节点:* 链接到当前节点/线程依赖于用于检查waitStatus的前导节点。 * 在排队期间分配,并且仅在出队时才被设为null(为了GC)。*/volatile Node prev;/***  后继节点:* 在排队期间分配,在绕过取消的前辈时进行调整,并在出局时为null排除(为了GC).* 看到一个空的下一个字段不一定意味着该节点在队列的结尾.* 因此,如果下一个字段看起来是空的,我们可以从尾部扫描prev来进行双重检查. * 被取消节点的下一个字段被设置为指向节点本身而不是null。*/volatile Node next;/*** 启动该节点的线程. 在构造器中初始化,并且使用后置为null*/volatile Thread thread;/*** 链接到下一个节点等待状态,或者特殊值SHARED.* 因为条件队列只能在独占模式下进行访问, 我们只需要一个简单的链接队列来保存节点,当他们在等待条件时. * 然后将它们转移到队列中以重新获取. * 并且因为条件只能是排他的,所以我们使用特殊的值来保存一个字段来表示共享模式.*/ Node nextWaiter;

可以看到AQS支持两种同步模式,分别是Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。这样方便使用者实现不同类型的同步组件。简而言之,AQS为使用者提供了多样的底层支撑,具体如何组装实现,使用者可以自由发挥。

3.1 CLH入列

CHL这种链表式结构入列,无非就是tail指向新节点、新节点的前驱节点指向当前最后的节点,当前最后一个节点的next指向当前节点,直接看源码相关操作在addWaiter(Node node)方法里。此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点  

	/*** 为当前线程和给定模式创建和排队节点。** @param mode*            Node.EXCLUSIVE用于独占,Node.SHARED用于共享* @return 新节点*/private Node addWaiter(Node mode) {//新建节点Node node = new Node(Thread.currentThread(), mode);// 快速尝试添加尾节点Node pred = tail;if (pred != null) {//新节点的前驱节点为尾节点node.prev = pred;//比较并且设置尾节点,如果尾节点与pred相等,则设置node为尾节点if (compareAndSetTail(pred, node)) {//之前尾节点的下一个节点为新的尾节点pred.next = node;//返回新的尾节点return node;}}//如果pred为空,插入新节点enq(node);return node;}/*** 将节点插入队列,如有必要,进行初始化。* * @param node*            要插入的节点* @return 插入节点的前驱节点*/private Node enq(final Node node) {//构建死循环,多次尝试,直到成功为止for (;;) {Node t = tail;//tail不存在,设置为首节点if (t == null) { // 必须初始化if (compareAndSetHead(new Node()))tail = head;} else {//设置为尾节点node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

3.2 CLH出列

 CLH同步队列遵循FIFO(先进先出),首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态

 

4. 同步状态的获取与释放

4.1 获取

 AQS的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态,对于子类而言它并没有太多的活要做,AQS提供了大量的模板方法来实现同步,主要是分为三类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况。自定义子类使用AQS提供的模板方法就可以实现自己的同步语义。

    独占式同步状态获取

    此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除获取到资源后。下面是acquire()的源码:

	/*** 以独占模式获取,忽略中断。* 至少调用一次{@link #tryAcquire}来实现,成功返回.* 否则线程排队,可能会重复阻塞和解除阻塞, 调用{@link #tryAcquire}直到成功.* 此方法可用于实现方法{@link Lock#lock}.*/public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();	//产生一个中断}
  • tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法由自定义同步组件自己实现(通过state的get/set/CAS),该方法必须要保证线程安全的获取同步状态。

  • addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部,并标记为独占模式。

  • acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;如果在整个等待过程中被中断过,则返回true,否则返回false。

  • selfInterrupt:如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

4.1.1 tryAcquire(int)

protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}

该方法直接抛出异常,具体实现交自定义同步器类实现。这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

4.1.2 acquireQueued

在执行到此方法时已经说明一点:该线程获取资源失败,已经被放入等待队列尾部了(先运行了addWaiter(Node.EXCLUSIVE))。所以 acquireQueued方法就是让线程进入等待状态休息,直到其他线程彻底释放资源后唤醒该线程,获取所需资源,然后执行该线程所需执行的任务。

   acquireQueued方法为一个自旋的过程,也就是说当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自我观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。

final boolean acquireQueued(final Node node, int arg) {/* 标记是否成功拿到资源 */boolean failed = true;try {/* 中断标志*/boolean interrupted = false;/*  自旋,一个死循环 */for (;;) {/* 获取前线程的前驱节点*/final Node p = node.predecessor();/*当前线程的前驱节点是头结点,即该节点是第二个节点,且同步状态成功*/if (p == head && tryAcquire(arg)) {/*将head指向该节点*/setHead(node);/* 方便GC回收垃圾 */p.next = null; failed = false;/*返回等待过程中是否被中断过*/return interrupted;}/*获取失败,线程就进入waiting状态,直到被unpark()*/if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())/*如果等待过程中被中断过一次,就标记为true*/interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

从上面代码中可以看到,当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,理由:

  • 保持FIFO同步队列原则。

  • 头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。

4.1.2.1 shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于检查状态,查看当前节点是否进入waiting状态

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//拿到前驱节点的状态if (ws == Node.SIGNAL)//状态为SIGNAL,如果前驱节点处于等待状态,直接返回truereturn true;if (ws > 0) {/** 如果前驱节点放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被GC回收*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驱节点正常,那就把前驱的状态通过CAS的方式设置成SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}

这段代码主要检查当前线程是否需要被阻塞,具体规则如下:

  1. 如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞

  2. 如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false

  3. 如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false

      整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能被阻塞,需要去找个安心的休息点(前驱节点状态 <= 0 ),同时可以再尝试下看有没有机会去获取资源。

     如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用

4.1.2.2 parkAndCheckInterrupt()方法阻塞当前线程:

private final boolean parkAndCheckInterrupt() {//调用park()使线程进入waiting状态LockSupport.park(this); //如果被唤醒,查看自己是不是被中断的return Thread.interrupted();}

parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。

4.2 释放

释放时,需要进行“CLH出列”,出列的主要工作则是唤醒其后继节点(一般来说就是head节点),让所有线程有序地进行下去:

	/*** 举例以独占模式释放* 以独占模式释放. 通过解除阻塞一个或多个线程实现,如果{@link #tryRelease}返回true。 * 这种方法可以用来实现方法 {@link Lock#unlock}.** @return 从{@link #tryRelease}返回的值*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);	//唤醒该节点的后继节点return true;}return false;}/*** 类似tryAcquire都需要使用AQS框架的时候去实现* 尝试设置状态以独占模式释放.该方法总是由执行释放的线程调用.* @return {@code true} * 				如果此对象现在处于完全释放状态,那么任何等待的线程都可能尝试获取; * @throws UnsupportedOperationException*             如果不支持独占模式*/protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/*** 唤醒该节点的后继节点,如果存在。*/private void unparkSuccessor(Node node) {/** 如果状态为负(即,可能需要信号),则试图在预期信号中清除. * 如果这样做失败或状态是否被等待线程改变就可以了。*/int ws = node.waitStatus;if (ws < 0)//毕竟并且设置状态为0compareAndSetWaitStatus(node, ws, 0);/** 断开线程被保存在后台,这通常只是下一个节点. * 但是,如果取消或显然为空,则从尾部向后移动以找到实际的未取消的后继者。*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

这篇关于多线程-AQS浅析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/983053

相关文章

浅析CSS 中z - index属性的作用及在什么情况下会失效

《浅析CSS中z-index属性的作用及在什么情况下会失效》z-index属性用于控制元素的堆叠顺序,值越大,元素越显示在上层,它需要元素具有定位属性(如relative、absolute、fi... 目录1. z-index 属性的作用2. z-index 失效的情况2.1 元素没有定位属性2.2 元素处

Java使用多线程处理未知任务数的方案介绍

《Java使用多线程处理未知任务数的方案介绍》这篇文章主要为大家详细介绍了Java如何使用多线程实现处理未知任务数,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 知道任务个数,你可以定义好线程数规则,生成线程数去跑代码说明:1.虚拟线程池:使用 Executors.newVir

JAVA封装多线程实现的方式及原理

《JAVA封装多线程实现的方式及原理》:本文主要介绍Java中封装多线程的原理和常见方式,通过封装可以简化多线程的使用,提高安全性,并增强代码的可维护性和可扩展性,需要的朋友可以参考下... 目录前言一、封装的目标二、常见的封装方式及原理总结前言在 Java 中,封装多线程的原理主要围绕着将多线程相关的操

浅析Python中的绝对导入与相对导入

《浅析Python中的绝对导入与相对导入》这篇文章主要为大家详细介绍了Python中的绝对导入与相对导入的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1 Imports快速介绍2 import语句的语法2.1 基本使用2.2 导入声明的样式3 绝对import和相对i

Python中多线程和多进程的基本用法详解

《Python中多线程和多进程的基本用法详解》这篇文章介绍了Python中多线程和多进程的相关知识,包括并发编程的优势,多线程和多进程的概念、适用场景、示例代码,线程池和进程池的使用,以及如何选择合适... 目录引言一、并发编程的主要优势二、python的多线程(Threading)1. 什么是多线程?2.

SpringBoot中使用 ThreadLocal 进行多线程上下文管理及注意事项小结

《SpringBoot中使用ThreadLocal进行多线程上下文管理及注意事项小结》本文详细介绍了ThreadLocal的原理、使用场景和示例代码,并在SpringBoot中使用ThreadLo... 目录前言技术积累1.什么是 ThreadLocal2. ThreadLocal 的原理2.1 线程隔离2

Java多线程父线程向子线程传值问题及解决

《Java多线程父线程向子线程传值问题及解决》文章总结了5种解决父子之间数据传递困扰的解决方案,包括ThreadLocal+TaskDecorator、UserUtils、CustomTaskDeco... 目录1 背景2 ThreadLocal+TaskDecorator3 RequestContextH

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同

浅析Rust多线程中如何安全的使用变量

《浅析Rust多线程中如何安全的使用变量》这篇文章主要为大家详细介绍了Rust如何在线程的闭包中安全的使用变量,包括共享变量和修改变量,文中的示例代码讲解详细,有需要的小伙伴可以参考下... 目录1. 向线程传递变量2. 多线程共享变量引用3. 多线程中修改变量4. 总结在Rust语言中,一个既引人入胜又可