本文主要是介绍多线程-AQS浅析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
1. AQS简介
2、AQS常用方法
2.1 关于state的方法主要有一下三种
2.2 自定义同步器实现时主要实现以下几种方法
2.3 其余方法
3、CLH
3.1 CLH入列
3.2 CLH出列
4. 同步状态的获取与释放
4.1 获取
4.1.1 tryAcquire(int)
4.1.2 acquireQueued
4.2 释放
1. AQS简介
- AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等)。
JUC包下的核心基础组件。也是实现大部分同步需求的基础。学习该组件是学习JUC绕不开的一块内容。
- AQS解决了子类实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了,所以使用AQS不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
- 在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。
- AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
- AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
- AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
#同步队列
同步队列是AQS很重要的组成部分,它是一个双端队列,遵循FIFO原则,主要作用是用来存放在锁上阻塞的线程,
当一个线程尝试获取锁时,如果已经被占用,那么当前线程就会被构造成一个Node节点假如到同步队列的尾部,队
列的头节点是成功获取锁的节点,当头节点线程释放锁时,会唤醒后面的节点并释放当前头节点的引用。
2、AQS常用方法
2.1 关于state的方法主要有一下三种
-
getState():返回同步状态的当前值;
-
setState(int newState):设置当前同步状态;
-
compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
2.2 自定义同步器实现时主要实现以下几种方法
-
tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态
-
tryRelease(int arg):独占式释放同步状态;
-
tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
-
tryReleaseShared(int arg):共享式释放同步状态;
-
isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
2.3 其余方法
-
acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
-
acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
-
tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
-
acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
-
acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
-
tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
-
release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
-
releaseShared(int arg):共享式释放同步状态;
3、CLH
CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其数据结构如下
其实就是个双端双向链表。
源码定义如下
/*** AQS会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列*/static final class Node {/** 表明节点正在等待共享模式的标记 */static final Node SHARED = new Node();/** 表明节点正在等待独占模式的标记 */static final Node EXCLUSIVE = null;/** waitStatus值表明线程已取消 */static final int CANCELLED = 1;/** waitStatus值表示后续线程需要运行 */static final int SIGNAL = -1;/** waitStatus值表示当前节点在等待condition,也就是在condition队列中*//** 节点在等待队列中,节点线程在等待Condition,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步 */static final int CONDITION = -2;/*** waitStatus值指示下一个acquireShared应无条件地传播*/static final int PROPAGATE = -3;/*** 等待状态,仅接受以下值: * SIGNAL:* 该节点的后继者(或将很快被)阻止(通过park),因此当前节点在释放或取消时必须取消其继承者* 为了避免竞争,获取方法必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时阻塞. * CANCELLED:* 该节点由于超时或中断而被取消。 节点永远不会离开这个状态。特别地,具有被取消节点的线程再也不会被阻塞. * CONDITION:* 此节点当前处于条件队列. 在传输之前,它不会用作同步队列节点,此时状态将设置为0. * PROPAGATE:* releaseShared应该传播到其他节点.* 在doReleaseShared中设置(仅适用于头节点),以确保传播继续,即使其他操作已经干预.** 这些值以数字排列以简化使用. 非负值意味着节点不需要信号。所以,大多数代码不需要检查特定的值,只是为了符号。** 对于一般同步节点,该字段被初始化为0,条件节点的CONDITION被初始化 它使用CAS(或可能的话,无条件的易失性写入)进行修改。* CANCELLED 取消状态* SIGNAL 等待触发状态,前节点可能是head或者前节点为取消状态CANCELLED* CONDITION 等待条件状态,在等待队列中* PROPAGATE 状态需要向后传播*/volatile int waitStatus;/*** 前驱节点:* 链接到当前节点/线程依赖于用于检查waitStatus的前导节点。 * 在排队期间分配,并且仅在出队时才被设为null(为了GC)。*/volatile Node prev;/*** 后继节点:* 在排队期间分配,在绕过取消的前辈时进行调整,并在出局时为null排除(为了GC).* 看到一个空的下一个字段不一定意味着该节点在队列的结尾.* 因此,如果下一个字段看起来是空的,我们可以从尾部扫描prev来进行双重检查. * 被取消节点的下一个字段被设置为指向节点本身而不是null。*/volatile Node next;/*** 启动该节点的线程. 在构造器中初始化,并且使用后置为null*/volatile Thread thread;/*** 链接到下一个节点等待状态,或者特殊值SHARED.* 因为条件队列只能在独占模式下进行访问, 我们只需要一个简单的链接队列来保存节点,当他们在等待条件时. * 然后将它们转移到队列中以重新获取. * 并且因为条件只能是排他的,所以我们使用特殊的值来保存一个字段来表示共享模式.*/ Node nextWaiter;
可以看到AQS支持两种同步模式,分别是Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。这样方便使用者实现不同类型的同步组件。简而言之,AQS为使用者提供了多样的底层支撑,具体如何组装实现,使用者可以自由发挥。
3.1 CLH入列
CHL这种链表式结构入列,无非就是tail指向新节点、新节点的前驱节点指向当前最后的节点,当前最后一个节点的next指向当前节点,直接看源码相关操作在addWaiter(Node node)方法里。此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点
/*** 为当前线程和给定模式创建和排队节点。** @param mode* Node.EXCLUSIVE用于独占,Node.SHARED用于共享* @return 新节点*/private Node addWaiter(Node mode) {//新建节点Node node = new Node(Thread.currentThread(), mode);// 快速尝试添加尾节点Node pred = tail;if (pred != null) {//新节点的前驱节点为尾节点node.prev = pred;//比较并且设置尾节点,如果尾节点与pred相等,则设置node为尾节点if (compareAndSetTail(pred, node)) {//之前尾节点的下一个节点为新的尾节点pred.next = node;//返回新的尾节点return node;}}//如果pred为空,插入新节点enq(node);return node;}/*** 将节点插入队列,如有必要,进行初始化。* * @param node* 要插入的节点* @return 插入节点的前驱节点*/private Node enq(final Node node) {//构建死循环,多次尝试,直到成功为止for (;;) {Node t = tail;//tail不存在,设置为首节点if (t == null) { // 必须初始化if (compareAndSetHead(new Node()))tail = head;} else {//设置为尾节点node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
3.2 CLH出列
CLH同步队列遵循FIFO(先进先出),首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态
4. 同步状态的获取与释放
4.1 获取
AQS的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态,对于子类而言它并没有太多的活要做,AQS提供了大量的模板方法来实现同步,主要是分为三类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的等待线程情况。自定义子类使用AQS提供的模板方法就可以实现自己的同步语义。
独占式同步状态获取
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除获取到资源后。下面是acquire()的源码:
/*** 以独占模式获取,忽略中断。* 至少调用一次{@link #tryAcquire}来实现,成功返回.* 否则线程排队,可能会重复阻塞和解除阻塞, 调用{@link #tryAcquire}直到成功.* 此方法可用于实现方法{@link Lock#lock}.*/public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); //产生一个中断}
-
tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法由自定义同步组件自己实现(通过state的get/set/CAS),该方法必须要保证线程安全的获取同步状态。
-
addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部,并标记为独占模式。
-
acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;如果在整个等待过程中被中断过,则返回true,否则返回false。
-
selfInterrupt:如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
4.1.1 tryAcquire(int)
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
该方法直接抛出异常,具体实现交自定义同步器类实现。这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。
4.1.2 acquireQueued
在执行到此方法时已经说明一点:该线程获取资源失败,已经被放入等待队列尾部了(先运行了addWaiter(Node.EXCLUSIVE))。所以 acquireQueued方法就是让线程进入等待状态休息,直到其他线程彻底释放资源后唤醒该线程,获取所需资源,然后执行该线程所需执行的任务。
acquireQueued方法为一个自旋的过程,也就是说当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自我观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。
final boolean acquireQueued(final Node node, int arg) {/* 标记是否成功拿到资源 */boolean failed = true;try {/* 中断标志*/boolean interrupted = false;/* 自旋,一个死循环 */for (;;) {/* 获取前线程的前驱节点*/final Node p = node.predecessor();/*当前线程的前驱节点是头结点,即该节点是第二个节点,且同步状态成功*/if (p == head && tryAcquire(arg)) {/*将head指向该节点*/setHead(node);/* 方便GC回收垃圾 */p.next = null; failed = false;/*返回等待过程中是否被中断过*/return interrupted;}/*获取失败,线程就进入waiting状态,直到被unpark()*/if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())/*如果等待过程中被中断过一次,就标记为true*/interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
从上面代码中可以看到,当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,理由:
-
保持FIFO同步队列原则。
-
头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。
4.1.2.1 shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于检查状态,查看当前节点是否进入waiting状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//拿到前驱节点的状态if (ws == Node.SIGNAL)//状态为SIGNAL,如果前驱节点处于等待状态,直接返回truereturn true;if (ws > 0) {/** 如果前驱节点放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被GC回收*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驱节点正常,那就把前驱的状态通过CAS的方式设置成SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}
这段代码主要检查当前线程是否需要被阻塞,具体规则如下:
-
如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
-
如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
-
如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能被阻塞,需要去找个安心的休息点(前驱节点状态 <= 0 ),同时可以再尝试下看有没有机会去获取资源。
如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用
4.1.2.2 parkAndCheckInterrupt()方法阻塞当前线程:
private final boolean parkAndCheckInterrupt() {//调用park()使线程进入waiting状态LockSupport.park(this); //如果被唤醒,查看自己是不是被中断的return Thread.interrupted();}
parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。
4.2 释放
释放时,需要进行“CLH出列”,出列的主要工作则是唤醒其后继节点(一般来说就是head节点),让所有线程有序地进行下去:
/*** 举例以独占模式释放* 以独占模式释放. 通过解除阻塞一个或多个线程实现,如果{@link #tryRelease}返回true。 * 这种方法可以用来实现方法 {@link Lock#unlock}.** @return 从{@link #tryRelease}返回的值*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h); //唤醒该节点的后继节点return true;}return false;}/*** 类似tryAcquire都需要使用AQS框架的时候去实现* 尝试设置状态以独占模式释放.该方法总是由执行释放的线程调用.* @return {@code true} * 如果此对象现在处于完全释放状态,那么任何等待的线程都可能尝试获取; * @throws UnsupportedOperationException* 如果不支持独占模式*/protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}/*** 唤醒该节点的后继节点,如果存在。*/private void unparkSuccessor(Node node) {/** 如果状态为负(即,可能需要信号),则试图在预期信号中清除. * 如果这样做失败或状态是否被等待线程改变就可以了。*/int ws = node.waitStatus;if (ws < 0)//毕竟并且设置状态为0compareAndSetWaitStatus(node, ws, 0);/** 断开线程被保存在后台,这通常只是下一个节点. * 但是,如果取消或显然为空,则从尾部向后移动以找到实际的未取消的后继者。*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
这篇关于多线程-AQS浅析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!