多线程-AQS浅析

2024-05-12 15:48
文章标签 多线程 浅析 aqs

本文主要是介绍多线程-AQS浅析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1. AQS简介

2、AQS常用方法

  2.1 关于state的方法主要有一下三种

  2.2 自定义同步器实现时主要实现以下几种方法

  2.3 其余方法

3、CLH

3.1 CLH入列

3.2 CLH出列

4. 同步状态的获取与释放

4.1 获取

4.1.1 tryAcquire(int)

4.1.2 acquireQueued

4.2 释放


 

1. AQS简介

  • AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等)。
JUC包下的核心基础组件。也是实现大部分同步需求的基础。学习该组件是学习JUC绕不开的一块内容。
  • AQS解决了子类实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了,所以使用AQS不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
  • 在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。
  • AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
  • AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
  • AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
#同步队列
同步队列是AQS很重要的组成部分,它是一个双端队列,遵循FIFO原则,主要作用是用来存放在锁上阻塞的线程,
当一个线程尝试获取锁时,如果已经被占用,那么当前线程就会被构造成一个Node节点假如到同步队列的尾部,队
列的头节点是成功获取锁的节点,当头节点线程释放锁时,会唤醒后面的节点并释放当前头节点的引用。

2、AQS常用方法

  2.1 关于state的方法主要有一下三种

  • getState():返回同步状态的当前值;

  • setState(int newState):设置当前同步状态;

  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;

  2.2 自定义同步器实现时主要实现以下几种方法

  • tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态

  • tryRelease(int arg):独占式释放同步状态;

  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;

  • tryReleaseShared(int arg):共享式释放同步状态;

  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;

  2.3 其余方法

  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;

  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;

  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;

  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;

  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;

  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;

  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;

  • releaseShared(int arg):共享式释放同步状态;

3、CLH

CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其数据结构如下

   其实就是个双端双向链表

   源码定义如下

	/*** AQS会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列*/static final class Node {/** 表明节点正在等待共享模式的标记 */static final Node SHARED = new Node();/** 表明节点正在等待独占模式的标记 */static final Node EXCLUSIVE = null;/** waitStatus值表明线程已取消 */static final int CANCELLED = 1;/** waitStatus值表示后续线程需要运行 */static final int SIGNAL = -1;/** waitStatus值表示当前节点在等待condition,也就是在condition队列中*//** 节点在等待队列中,节点线程在等待Condition,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步 */static final int CONDITION = -2;/*** waitStatus值指示下一个acquireShared应无条件地传播*/static final int PROPAGATE = -3;/*** 等待状态,仅接受以下值: * SIGNAL:* 		该节点的后继者(或将很快被)阻止(通过park),因此当前节点在释放或取消时必须取消其继承者* 		为了避免竞争,获取方法必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时阻塞. * CANCELLED:* 		该节点由于超时或中断而被取消。 节点永远不会离开这个状态。特别地,具有被取消节点的线程再也不会被阻塞. * CONDITION:* 		此节点当前处于条件队列. 在传输之前,它不会用作同步队列节点,此时状态将设置为0.                 * PROPAGATE:*     releaseShared应该传播到其他节点.*     在doReleaseShared中设置(仅适用于头节点),以确保传播继续,即使其他操作已经干预.** 这些值以数字排列以简化使用. 非负值意味着节点不需要信号。所以,大多数代码不需要检查特定的值,只是为了符号。** 对于一般同步节点,该字段被初始化为0,条件节点的CONDITION被初始化 它使用CAS(或可能的话,无条件的易失性写入)进行修改。* CANCELLED 取消状态* SIGNAL 等待触发状态,前节点可能是head或者前节点为取消状态CANCELLED* CONDITION 等待条件状态,在等待队列中* PROPAGATE 状态需要向后传播*/volatile int waitStatus;/*** 前驱节点:* 链接到当前节点/线程依赖于用于检查waitStatus的前导节点。 * 在排队期间分配,并且仅在出队时才被设为null(为了GC)。*/volatile Node prev;/***  后继节点:* 在排队期间分配,在绕过取消的前辈时进行调整,并在出局时为null排除(为了GC).* 看到一个空的下一个字段不一定意味着该节点在队列的结尾.* 因此,如果下一个字段看起来是空的,我们可以从尾部扫描prev来进行双重检查. * 被取消节点的下一个字段被设置为指向节点本身而不是null。*/volatile Node next;/*** 启动该节点的线程. 在构造器中初始化,并且使用后置为null*/volatile Thread thread;/*** 链接到下一个节点等待状态,或者特殊值SHARED.* 因为条件队列只能在独占模式下进行访问, 我们只需要一个简单的链接队列来保存节点,当他们在等待条件时. * 然后将它们转移到队列中以重新获取. * 并且因为条件只能是排他的,所以我们使用特殊的值来保存一个字段来表示共享模式.*/ Node nextWaiter;

可以看到AQS支持两种同步模式,分别是Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。这样方便使用者实现不同类型的同步组件。简而言之,AQS为使用者提供了多样的底层支撑,具体如何组装实现,使用者可以自由发挥。

3.1 CLH入列

CHL这种链表式结构入列,无非就是tail指向新节点、新节点的前驱节点指向当前最后的节点,当前最后一个节点的next指向当前节点,直接看源码相关操作在addWaiter(Node node)方法里。此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点  

	/*** 为当前线程和给定模式创建和排队节点。** @param mode*            Node.EXCLUSIVE用于独占,Node.SHARED用于共享* @return 新节点*/private Node addWaiter(Node mode) {//新建节点Node node = new Node(Thread.currentThread(), mode);// 快速尝试添加尾节点Node pred = tail;if (pred != null) {//新节点的前驱节点为尾节点node.prev = pred;//比较并且设置尾节点,如果尾节点与pred相等,则设置node为尾节点if (compareAndSetTail(pred, node)) {//之前尾节点的下一个节点为新的尾节点pred.next = node;//返回新的尾节点return node;}}//如果pred为空,插入新节点enq(node);return node;}/*** 将节点插入队列,如有必要,进行初始化。* * @param node*            要插入的节点* @return 插入节点的前驱节点*/private Node enq(final Node node) {//构建死循环,多次尝试,直到成功为止for (;;) {Node t = tail;//tail不存在,设置为首节点if (t == null) { // 必须初始化if (compareAndSetHead(new Node()))tail = head;} else {//设置为尾节点node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

3.2 CLH出列

 CLH同步队列遵循FIFO(先进先出),首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态

 

4. 同步状态的获取与释放

4.1 获取

 AQS的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态,对于子类而言它并没有太多的活要做,AQS提供了大量的模板方法来实现同步,主要是分为三类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况。自定义子类使用AQS提供的模板方法就可以实现自己的同步语义。

    独占式同步状态获取

    此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除获取到资源后。下面是acquire()的源码:

	/*** 以独占模式获取,忽略中断。* 至少调用一次{@link #tryAcquire}来实现,成功返回.* 否则线程排队,可能会重复阻塞和解除阻塞, 调用{@link #tryAcquire}直到成功.* 此方法可用于实现方法{@link Lock#lock}.*/public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();	//产生一个中断}
  • tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法由自定义同步组件自己实现(通过state的get/set/CAS),该方法必须要保证线程安全的获取同步状态。

  • addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部,并标记为独占模式。

  • acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;如果在整个等待过程中被中断过,则返回true,否则返回false。

  • selfInterrupt:如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

4.1.1 tryAcquire(int)

protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}

该方法直接抛出异常,具体实现交自定义同步器类实现。这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

4.1.2 acquireQueued

在执行到此方法时已经说明一点:该线程获取资源失败,已经被放入等待队列尾部了(先运行了addWaiter(Node.EXCLUSIVE))。所以 acquireQueued方法就是让线程进入等待状态休息,直到其他线程彻底释放资源后唤醒该线程,获取所需资源,然后执行该线程所需执行的任务。

   acquireQueued方法为一个自旋的过程,也就是说当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自我观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。

final boolean acquireQueued(final Node node, int arg) {/* 标记是否成功拿到资源 */boolean failed = true;try {/* 中断标志*/boolean interrupted = false;/*  自旋,一个死循环 */for (;;) {/* 获取前线程的前驱节点*/final Node p = node.predecessor();/*当前线程的前驱节点是头结点,即该节点是第二个节点,且同步状态成功*/if (p == head && tryAcquire(arg)) {/*将head指向该节点*/setHead(node);/* 方便GC回收垃圾 */p.next = null; failed = false;/*返回等待过程中是否被中断过*/return interrupted;}/*获取失败,线程就进入waiting状态,直到被unpark()*/if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())/*如果等待过程中被中断过一次,就标记为true*/interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

从上面代码中可以看到,当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,理由:

  • 保持FIFO同步队列原则。

  • 头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。

4.1.2.1 shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于检查状态,查看当前节点是否进入waiting状态

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//拿到前驱节点的状态if (ws == Node.SIGNAL)//状态为SIGNAL,如果前驱节点处于等待状态,直接返回truereturn true;if (ws > 0) {/** 如果前驱节点放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被GC回收*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驱节点正常,那就把前驱的状态通过CAS的方式设置成SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}

这段代码主要检查当前线程是否需要被阻塞,具体规则如下:

  1. 如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞

  2. 如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false

  3. 如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false

      整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能被阻塞,需要去找个安心的休息点(前驱节点状态 <= 0 ),同时可以再尝试下看有没有机会去获取资源。

     如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用

4.1.2.2 parkAndCheckInterrupt()方法阻塞当前线程:

private final boolean parkAndCheckInterrupt() {//调用park()使线程进入waiting状态LockSupport.park(this); //如果被唤醒,查看自己是不是被中断的return Thread.interrupted();}

parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。

4.2 释放

释放时,需要进行“CLH出列”,出列的主要工作则是唤醒其后继节点(一般来说就是head节点),让所有线程有序地进行下去:

	/*** 举例以独占模式释放* 以独占模式释放. 通过解除阻塞一个或多个线程实现,如果{@link #tryRelease}返回true。 * 这种方法可以用来实现方法 {@link Lock#unlock}.** @return 从{@link #tryRelease}返回的值*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);	//唤醒该节点的后继节点return true;}return false;}/*** 类似tryAcquire都需要使用AQS框架的时候去实现* 尝试设置状态以独占模式释放.该方法总是由执行释放的线程调用.* @return {@code true} * 				如果此对象现在处于完全释放状态,那么任何等待的线程都可能尝试获取; * @throws UnsupportedOperationException*             如果不支持独占模式*/protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/*** 唤醒该节点的后继节点,如果存在。*/private void unparkSuccessor(Node node) {/** 如果状态为负(即,可能需要信号),则试图在预期信号中清除. * 如果这样做失败或状态是否被等待线程改变就可以了。*/int ws = node.waitStatus;if (ws < 0)//毕竟并且设置状态为0compareAndSetWaitStatus(node, ws, 0);/** 断开线程被保存在后台,这通常只是下一个节点. * 但是,如果取消或显然为空,则从尾部向后移动以找到实际的未取消的后继者。*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

这篇关于多线程-AQS浅析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/983053

相关文章

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

多线程解析报表

假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。 Way1 join import java.time.LocalTime;public class Main {public static void main(String[] args) thro

Java 多线程概述

多线程技术概述   1.线程与进程 进程:内存中运行的应用程序,每个进程都拥有一个独立的内存空间。线程:是进程中的一个执行路径,共享一个内存空间,线程之间可以自由切换、并发执行,一个进程最少有一个线程,线程实际数是在进程基础之上的进一步划分,一个进程启动之后,进程之中的若干执行路径又可以划分成若干个线程 2.线程的调度 分时调度:所有线程轮流使用CPU的使用权,平均分配时间抢占式调度

Java 多线程的基本方式

Java 多线程的基本方式 基础实现两种方式: 通过实现Callable 接口方式(可得到返回值):

JAVA- 多线程

一,多线程的概念 1.并行与并发 并行:多个任务在同一时刻在cpu 上同时执行并发:多个任务在同一时刻在cpu 上交替执行 2.进程与线程 进程:就是操作系统中正在运行的一个应用程序。所以进程也就是“正在进行的程序”。(Windows系统中,我们可以在任务管理器中看 到进程) 线程:是程序运行的基本执行单元。当操作系统执行一个程序时, 会在系统中建立一个进程,该进程必须至少建立一个线

(入门篇)JavaScript 网页设计案例浅析-简单的交互式图片轮播

网页设计已经成为了每个前端开发者的必备技能,而 JavaScript 作为前端三大基础之一,更是为网页赋予了互动性和动态效果。本篇文章将通过一个简单的 JavaScript 案例,带你了解网页设计中的一些常见技巧和技术原理。今天就说一说一个常见的图片轮播效果。相信大家在各类电商网站、个人博客或者展示页面中,都看到过这种轮播图。它的核心功能是展示多张图片,并且用户可以通过点击按钮,左右切换图片。

多线程篇(阻塞队列- LinkedBlockingDeque)(持续更新迭代)

目录 一、LinkedBlockingDeque是什么 二、核心属性详解 三、核心方法详解 addFirst(E e) offerFirst(E e) putFirst(E e) removeFirst() pollFirst() takeFirst() 其他 四、总结 一、LinkedBlockingDeque是什么 首先queue是一种数据结构,一个集合中

多线程篇(阻塞队列- LinkedBlockingQueue)(持续更新迭代)

目录 一、基本概要 1. 构造函数 2. 内部成员 二、非阻塞式添加元素:add、offer方法原理 offer的实现 enqueue入队操作 signalNotEmpty唤醒 删除线程(如消费者线程) 为什么要判断if (c == 0)时才去唤醒消费线程呢? 三、阻塞式添加元素:put 方法原理 图解:put线程的阻塞过程 四、非阻塞式移除:poll方法原理 dequ

spring笔记 多线程的支持

spring的工作机制 136  属性编辑器 140 spring事件的体系结构 168 Bean间的关系 109 继承 依赖 引用     Bean的继承          1 为了简化初始化的属性注入;          2 子Bean和父Bean相同的属性值,使用子Bean的     Bean的依赖 Srping控制相互依赖的Bean之间,属性注入的顺序,防止出错  depend-on

【编程底层思考】详解Java的JUC多线程并发编程底层组件AQS的作用及原理

Java中的AbstractQueuedSynchronizer(简称AQS)是位于java.util.concurrent.locks包中的一个核心组件,用于构建锁和其他同步器。AQS为实现依赖于FIFO(先进先出)等待队列的阻塞锁和相关同步器提供了一套高效、可扩展的框架。 一、AQS的作用 统一同步状态管理:AQS提供了一个int类型的成员变量state,用于表示同步状态。子类可以根据自己