JDK源码——AbstractQueuedSynchronizer源码

2023-10-30 14:59

本文主要是介绍JDK源码——AbstractQueuedSynchronizer源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

摘要

AQS 就是一个抽象类,继承了AbstractOwnableSynchronizer ,主要用来构建锁和同步器。这个类在 java.util.concurrent.locks 包下面,AQS为构建锁和同步器提供了一些通用功能的实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如 ReentrantLock,Semaphore,其他的 ReentrantReadWriteLock,SynchronousQueue,FutureTask(jdk1.7) 等等皆是基于 AQS 的。

AQS原理

AQS核心思想是如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。CLH(Craig, Landin, and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。

 AQS变量

AQS 使用一个int成员变量state来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS 使用CAS对该同步状态进行原子操作实现对其值的修改。

public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {}
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

 Node类

static final class Node {// 共享模式static final Node SHARED = new Node();// 独占模式static final Node EXCLUSIVE = null;// waitStatus的几种状态static final int CANCELLED =  1;static final int SIGNAL    = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;volatile int waitStatus;// 前驱节点(主队列)volatile Node prev;// 后继节点(主队列)volatile Node next;// 节点的线程volatile Thread thread;// 后继节点(条件队列)Node nextWaiter;final boolean isShared() {return nextWaiter == SHARED;}final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() {    // Used to establish initial head or SHARED marker}Node(Thread thread, Node mode) {     // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}
}
  •  Node pre:前驱节点,当前节点加入到同步队列中被设置(尾部添加)
  • Node next:后继节点
  • Thread thread:节点同步状态的线程
  • Node nextWaiter:等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段
  • int waitStatus:等待状态,标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态,

CAS(Compare and Swap),比较并替换操作,CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。状态信息通过 protected 类型的getState(),setState(),compareAndSetState() 进行操作。volatile保证线程可见性,高并发场景下,即被一个线程修改后,状态会立马让其他线程可见,五种状态分别为:

  • CANCELLED,值为1,在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态后将不会变化
  • SIGNAL,值为-1,后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
  • CONDITION,值为-2,节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
  • PROPAGATE ,值为-3,表示下一次共享式同步状态获取将会被无条件地传播下去
  • INITIAL,值为0,初始状态

AQS对资源的共享方式

Exclusive 独占式

只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁:

  • 公平锁 :按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁 :当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。

ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)

公平锁和非公平锁只有两处不同:

  • 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
  • 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

Share 共享式

多个线程可同时执行,如 Semaphore、CountDownLatch、 CyclicBarrier、ReadWriteLock;

ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。

AQS 提供的模板方法

//该线程是否正在独占资源。只有用到condition才需要去实现它isHeldExclusively()//独占方式。尝试获取资源,成功则返回true,失败则返回falsetryAcquire(int)//独占方式。尝试释放资源,成功则返回true,失败则返回falsetryRelease(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源tryAcquireShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回falsetryReleaseShared(int)

AQS实战源码分析

package com.zhuangxiaoyan;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;/*** @Classname test1* @Description TODO* @Date 2021/11/25 19:43* @Created by xjl*/
public class test1 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();//带入一个银行办理业务的案例来模拟我们的AQs如何进行线程的管理和通知唤醒机制new Thread(() -> {lock.lock();try {System.out.println("A----thread come in");TimeUnit.MINUTES.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}, "A").start();//由于受理业务的窗口只有一个(只能一个线程持有锁),此时B只能等待,new Thread(() -> {lock.lock();try {System.out.println("A----thread come in");TimeUnit.MINUTES.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}, "B").start();//由于受理业务的窗口只有一个(只能一个线程持有锁),此时B只能等待,new Thread(() -> {lock.lock();try {System.out.println("A----thread come in");TimeUnit.MINUTES.sleep(20);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}, "C").start();}
}

独占模式源码分析

独占模式下的操作主要有以下几个方法(可与前面分析的 Lock 接口的方法类比):

  • acquire(int arg):以独占模式获取资源,忽略中断;可以类比 Lock 接口的 lock 方法;
  • acquireInterruptibly(int arg):以独占模式获取资源,响应中断;可以类比 Lock 接口的 lockInterruptibly 方法;
  • tryAcquireNanos(int arg, long nanosTimeout):以独占模式获取资源,响应中断,且有超时等待;可以类比 Lock 接口的 tryLock(long, TimeUnit) 方法;
  • release(int arg):释放资源,可以类比 Lock 接口的 unlock 方法。

设置状态位

   protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}

Lock方法

/**
*Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock.  Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}

tryAcquire()方法

        /*** Performs non-fair tryLock.  tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

AddWaiter()方法

//准备进入队列    
/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
//进入队列的方法/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是古位。真正的第一个有数据的节点,是从第二个节点开始的。

AcquireQueued()方法

    /*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
private final boolean parkAndCheckInterrupt(){//threadB 被阻塞中,正在排队等待中LockSupprot.park(this);return Thread.interrupted();
}

TryRelease()方法

        protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}

CAS的底层实现原理

AQS 内部通过 Unsafe 类实现了一系列 CAS (Compare And Swap) 操作。AQS 内部的许多操作是通过 CAS 来实现线程安全的。

// 获取 Unsafe 实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state、head、tail 等变量的内存偏移地址
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }
}// 一些 CAS 操作
private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);
}private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
}private static final boolean compareAndSetNext(Node node,Node expect,Node update) {return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

AQS中Node独占模式

独占模式下的操作主要有以下几个方法(可与前面分析的 Lock 接口的方法类比):

  • acquire(int arg):以独占模式获取资源,忽略中断;可以类比 Lock 接口的 lock 方法;
  • acquireInterruptibly(int arg):以独占模式获取资源,响应中断;可以类比 Lock 接口的 lockInterruptibly 方法;
  • tryAcquireNanos(int arg, long nanosTimeout):以独占模式获取资源,响应中断,且有超时等待;可以类比 Lock 接口的 tryLock(long, TimeUnit) 方法;
  • release(int arg):释放资源,可以类比 Lock 接口的 unlock 方法。

Acquire()方法

public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}

该方法看似很短,其实是内部做了封装。这几行代码包含了如下四个操作步骤:

  1. tryAcquire
  2. addWaiter(Node.EXECUSIVE)
  3. acquireQueued(final Node node, arg))
  4. selfInterrupt
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}

该方法的作用是尝试以独占模式获取资源,若成功则返回 true。可以看到该方法是一个 protected 方法,而且 AQS 中该方法直接抛出了异常,其实是它把实现委托给了子类。这也是 ReentrantLock、CountdownLatch 等类(严格来说是其内部类 Sync)的实现功能不同的地方,这些类正是通过对该方法的不同实现来制定了自己的“游戏规则”。

若 step 1 中的 tryAcquire 方法返回 true,则表示当前线程获取资源成功,方法直接返回,该线程接下来就可以“为所欲为”了;否则表示获取失败,接下来会依次执行 step 2 和 step 3。

private Node addWaiter(Node mode) {// 将当前线程封装为一个 Node 节点,指定 mode// PS: 独占模式 Node.EXECUSIVE, 共享模式 Node.SHAREDNode node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;// 通过 CAS 操作设置主队列的尾节点if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 尾节点 tail 为 null,表示主队列未初始化enq(node);return node;
}
private Node enq(final Node node) {for (;;) {Node t = tail;// 尾节点为空,表明当前队列未初始化if (t == null) { // Must initialize// 将队列的头尾节点都设置为一个新的节点if (compareAndSetHead(new Node()))tail = head;} else {// 将 node 节点插入主队列末尾node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

可以看到 addWaiter(Node.EXECUSIVE) 方法的作用是:把当前线程封装成一个独占模式的 Node 节点,并插入到主队列末尾(若主队列未初始化,则将其初始化后再插入)。

step 3: acquireQueued(final Node node, arg))

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 中断标志位boolean interrupted = false;for (;;) {// 获取该节点的前驱节点final Node p = node.predecessor();// 若前驱节点为头节点,则尝试获取资源if (p == head && tryAcquire(arg)) {// 若获取成功,则将该节点设置为头节点并返回setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 若上面条件不满足,即前驱节点不是头节点,或尝试获取失败// 判断当前线程是否可以休眠if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

若当前节点的前驱节点为头节点,则会再次尝试获取资源(tryAcuqire),若获取成功,则将当前节点设置为头节点并返回;否则若前驱节点不是头节点,或者获取资源失败,执行如下两个方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 前驱节点的等待状态int ws = pred.waitStatus;// 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 若前驱节点的状态大于 0,表示前驱节点处于取消(CANCELLED)状态// 则将前驱节点跳过(相当于踢出队列)if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/// 此时 waitStatus 只能为 0 或 PROPAGATE 状态,将前驱节点的等着状态设置为 SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}

该方法的流程:

  1. 若前驱节点的等待状态为 SIGNAL,返回 true,表示当前线程可以休眠(park);
  2. 若前驱节点是取消状态 (ws > 0),则将其清理出队列,以此类推;
  3. 若前驱节点为 0 或 PROPAGATE,则将其设置为 SIGNAL 状态。

正如其名,该方法(shouldParkAfterFailedAcquire)的作用就是判断当前线程在获取资源失败后,是否可以休眠(park)。

private final boolean parkAndCheckInterrupt() {// 将当前线程休眠LockSupport.park(this);return Thread.interrupted();
}

该方法的作用:

  1. 使当前线程休眠(park);
  2. 返回该线程是否被中断(其他线程对其发过中断信号)

上面就是 acquireQueued(final Node node, arg)) 方法的执行过程,为了便于理解,可参考下面的流程图:

若此期间被其他线程中断过,则此时再去执行 selfInterrupt 方法去响应中断请求: 

static void selfInterrupt() {Thread.currentThread().interrupt();
}

以上就是 acquire 方法执行的整体流程。

acquireInterruptibly()(响应中断)

该操作其实与前面的过程类似,因此分析相对简单些,代码如下:

public final void acquireInterruptibly(int arg)throws InterruptedException // 若线程被中断过,则抛出异常if (Thread.interrupted())throw new InterruptedException();// 尝试获取资源if (!tryAcquire(arg))// 尝试获取资源失败doAcquireInterruptibly(arg);
}

tryAcquire 与前面的操作一样,若尝试获取资源成功则直接返回;否则,执行doAcquireInterruptibly:

private void doAcquireInterruptibly(int arg)throws InterruptedException // 将当前线程封装成 Node 节点插入主队列末尾final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 抛出中断异常throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

通过与前面的 acquire 方法对比可以发现,二者代码几乎一样,区别在于 acquire 方法检测到中断(parkAndCheckInterrupt)时只是记录了标志位,并未响应;而此处直接抛出了异常。这也是二者仅有的区别。

tryAcquireNanos()(响应中断,且有超时)

public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException // 若被中断,则响应if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);
}
static final long spinForTimeoutThreshold = 1000L;private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {// 若超时时间小于等于 0,直接获取失败if (nanosTimeout <= 0L)return false;// 计算截止时间final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}nanosTimeout = deadline - System.nanoTime();// 已经超时了,获取失败if (nanosTimeout <= 0L)return false;// 若大于自旋时间,则线程休眠;否则自旋if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);// 若被中断,则响应if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

这里有个变量 spinForTimeoutThreshold,表示自旋时间,若大于该值则将线程休眠,否则继续自旋。个人理解这里增加该时间是为了提高效率,即,只有在等待时间较长的时候才让线程休眠。该方法与 acquireInterruptibly 也是类似的,在前者的基础上增加了 timeout。

release()释放资源

前面分析了三种获取资源的方式,自然也有释放资源。下面分析释放资源的 release 操作:

public final boolean release(int arg) {// 尝试释放资源,若成功则返回 trueif (tryRelease(arg)) {Node h = head;// 若头节点不为空,且等待状态不为 0(此时为 SIGNAL)// 则唤醒其后继节点if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}

与 tryAcquire 方法类似,tryRelease 方法在 AQS 中也是抛出异常,同样交由子类实现:

protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}

unparkSuccessor 的主要作用是唤醒 node 的后继节点,代码如下:

private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 后继节点Node s = node.next;if (s == null || s.waitStatus > 0) {// 若后继节点是取消状态,则从尾节点向前遍历,找到 node 节点后面一个未取消状态的节点s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 唤醒node节点的后继节点if (s != null)LockSupport.unpark(s.thread);
}

若 node 节点的后继节点是取消状态(ws > 0),则从主队列中取其后面一个非取消状态的线程唤醒。前面三个获取资源的方法中,finally 代码块中都用到了 cancelAcquire 方法,都是获取失败时的操作,这里也分析一下:

private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null;// Skip cancelled predecessors// 跳过取消状态的前驱节点Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.// 前驱节点的后继节点引用Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.// 将当前节点设置为取消状态node.waitStatus = Node.CANCELLED;// If we are the tail, remove ourselves.// 若该节点为尾节点(后面没其他节点了),将 predNext 指向 nullif (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {// 前驱节点为头节点,表明当前节点为第一个,取消时唤醒它的下一个节点unparkSuccessor(node);}node.next = node; // help GC}
}

该方法的主要操作:

  1. 将 node 节点设置为取消(CANCELLED)状态;
  2. 找到它在队列中非取消状态的前驱节点 pred:
    1. 若 node 节点是尾节点,则前驱节点的后继设为空,
    2. 若 pred 不是头节点,且状态为 SIGNAL,则后继节点设为 node 的后继节点;
    3. 若 pred 是头节点,则唤醒 node 的后继节点。

该过程可以跟双链表删除一个节点的过程进行对比分析

AQS中Node共享模式

与独占模式类似,共享模式下也有与之类似的相应操作,分别如下:

  1. acquireShared(int arg): 以共享模式获取资源,忽略中断;
  2. acquireSharedInterruptibly(int arg): 以共享模式获取资源,响应中断;
  3. tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式获取资源,响应中断,且有超时等待;
  4. releaseShared(int arg): 释放资源,唤醒后继节点,并确保传播。

acquireShared()(忽略中断)

public final void acquireShared(int arg) {// 返回值小于 0,表示获取失败if (tryAcquireShared(arg) < 0)doAcquireShared(arg);
}// 尝试以共享模式获取资源(返回值为 int 类型)
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}

与独占模式的 tryAcquire 方法类似,tryAcquireShared 方法在 AQS 中也抛出异常,由子类实现其逻辑。不同的地方在于,tryAcquire 方法的返回结果是 boolean 类型,表示获取成功与否;而 tryAcquireShared 的返回结果是 int 类型,分别为:

  1. 负数:表示获取失败;
  2. 零:表示获取成功,但后续共享模式的获取会失败;
  3. 正数:表示获取成功,后续共享模式的获取可能会成功(需要进行检测)

若 tryAcquireShared 获取成功,则直接返回;否则执行 doAcquireShared 方法:

private void doAcquireShared(int arg) {// 把当前线程封装成共享模式的 Node 节点,插入主队列末尾final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 中断标志位boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 若前驱节点为头节点,则尝试获取资源if (p == head) {int r = tryAcquireShared(arg);// 这里表示当前线程成功获取到了资源if (r >= 0) {// 设置头节点,并传播状态(注意这里与独占模式不同)setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}// 是否应该休眠(与独占模式相同,不再赘述)if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 取消操作(与独占模式相同)cancelAcquire(node);}
}

doAcquireShared 方法会把当前线程封装成一个共享模式(SHARED)的节点,并插入主队列末尾。addWaiter(Node mode) 方法前文已经分析过。该方法与 acquireQueued 方法的区别在于 setHeadAndPropagate 方法,把当前节点设置为头节点之后,还会有传播(propagate)行为。

private void setHeadAndPropagate(Node node, int propagate) {// 记录旧的头节点Node h = head; // Record old head for check below// 将 node 设置为头节点setHead(node);/** Try to signal next queued node if:*   Propagation was indicated by caller,*     or was recorded (as h.waitStatus either before*     or after setHead) by a previous operation*     (note: this uses sign-check of waitStatus because*      PROPAGATE status may transition to SIGNAL.)* and*   The next node is waiting in shared mode,*     or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 后继节点为空或共享模式唤醒if (s == null || s.isShared())doReleaseShared();}
}
private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases.  This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {// 这里的头节点已经是上面设置后的头节点了Node h = head;// 由于该方法有两个入口(setHeadAndPropagate 和 releaseShared),需考虑并发控制if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 唤醒后继节点unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 若头节点不变,则跳出循环;否则继续循环if (h == head)                   // loop if head changedbreak;}
}

该方法与独占模式下的获取方法 acquire 大体相似,不同在于该方法中,节点获取资源后会传播状态,即,有可能会继续唤醒后继节点。值得注意的是:该方法有两个入口 setHeadAndPropagate 和 releaseShared,可能有多个线程操作,需考虑并发控制。

acquireSharedInterruptibly()(响应中断)

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

tryAcquireShared 方法前面已分析,若获取资源失败,会执行 doAcquireSharedInterruptly 方法:

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 把当前线程封装成共享模式节点,并插入主队列final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 与 doAcquireShared 相比,区别在于这里抛出了异常if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

从代码可以看到,acquireSharedInterruptibly 方法与 acquireShared 方法几乎完全一样,不同之处仅在于前者会抛出 InterruptedException 异常响应中断;而后者仅记录标志位,获取结束后才响应。

tryAcquireSharedNanos()(响应中断,且有超时)

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);
}

doAcquireSharedNanos:该方法可与独占模式下的超时等待方法 tryAcquireNanos(int arg, long nanosTimeout) 进行对比,二者操作基本一致

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}

 release() 释放资源,唤醒节点,传播状态

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}

AQS中的Node两种模式下的场景分析

场景如下:有 T0~T4 共 5 个线程按先后顺序获取资源,其中 T2 和 T3 为共享模式,其他均为独占模式。就此场景分析:T0 先获取到资源(假设占用时间较长),而后 T1~T4 再获取则失败,会依次进入主队列。此时主队列中各个节点的状态示意图如下:

之后,T0 操作完毕并释放资源,会将 T1 唤醒。T1(独占模式) 会从 acquireQueued(final Node node, int arg) 方法的循环中继续获取资源,这时会获取成功,并将 T1 设置为头节点(T 被移除)。此时主队列节点示意图如下:

此时,T1 获取到资源并进行相关操作。而后,T1 操作完释放资源,并唤醒下一个节点 T2,T2(共享模式) 继续从 doAcquireShared(int) 方法的循环中执行。此时 T2 获取资源成功,将自身设为头节点(T1 被移除),由于后继节点 T3 也是共享模式,因此 T1 会继续唤醒T3;T3 唤醒后的操作与 T2 相同,但后继节点 T4 不是共享模式,因此不再继续唤醒。此时队列节点状态示意图如下:

此时,T2 和 T3 同时获取到资源。之后,当二者都释放资源后会唤醒 T4:

T4 获取资源的与 T1 类似。

AQS源码总结

  • AQS 是一个抽象类,无法直接进行实例化;
  • AQS 内部维护了一个核心变量 state,以及两种队列:主队列(main queue)和条件队列(condition queue);
  • AQS 提供了一套基础设施,ReentrantLock 等类通常用一个内部嵌套类 Sync 继承 AQS,并在 Sync 类中制定自己的“游戏规则”。
  • acquire: 独占模式获取资源,忽略中断;
  • acquireInterruptibly: 独占模式获取资源,响应中断;
  • tryAcquireNanos: 独占模式获取资源,响应中断,有超时;
  • release: 释放资源,唤醒主队列中的下一个线程。
  • 本文分析了以共享模式获取资源的三种方式,以及释放资源的操作。分别为:

  • acquireShared: 共享模式获取资源,忽略中断;
  • acquireSharedInterruptibly: 共享模式获取资源,响应中断;
  • tryAcquireSharedNanos: 共享模式获取资源,响应中断,有超时;
  • releaseShared: 释放资源,唤醒后继节点,并确保传播。

博文参考

AQS、Semaphore、CountDownLatch与CyclicBarrier原理及使用方法_如何心安理得的在老板眼皮下摸鱼-CSDN博客_aqs java

JUC并发核心AQS同步队列原理详解_没头脑遇到不高兴-CSDN博客

JDK源码分析-AbstractQueuedSynchronizer(1) - 知乎

JDK源码分析-AbstractQueuedSynchronizer(2) - 知乎

JDK源码分析-AbstractQueuedSynchronizer(3) - 知乎

这篇关于JDK源码——AbstractQueuedSynchronizer源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/308725

相关文章

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

常用的jdk下载地址

jdk下载地址 安装方式可以看之前的博客: mac安装jdk oracle 版本:https://www.oracle.com/java/technologies/downloads/ Eclipse Temurin版本:https://adoptium.net/zh-CN/temurin/releases/ 阿里版本: github:https://github.com/

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

mac jdk 1.7 dmg 官方版

百度云下载 https://pan.baidu.com/s/1SQiidrPFF5aZr4xlx0ekoQ https://pan.baidu.com/s/1SQiidrPFF5aZr4xlx0ekoQ   补充说明: 实际上oracle对于历史版本的jdk都有归档可以在官方网站上下载,只是需要注册个号就可以了。 地址如下: https://www.oracle.com/cn/java

red5-server源码

red5-server源码:https://github.com/Red5/red5-server