【并发基础】AQS详解

2024-06-07 21:38
文章标签 基础 详解 并发 aqs

本文主要是介绍【并发基础】AQS详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLockReentrantReadWriteLockSemaphore等)。

核心数据结构

AQS属性

//队列头指针
private transient volatile Node head;
//队列尾指针
private transient volatile Node tail;
//同步状态
private volatile int state;

Node结点属性

//代表共享模式
static final Node SHARED = new Node();
//代表独占模式
static final Node EXCLUSIVE = null;//等待状态值,下文会具体分析
static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;//前驱指针
volatile Node prev;
//后继指针
volatile Node next;
//线程
volatile Thread thread;
//
Node nextWaiter;

AQS维护了一个 volatile int state(代表共享资源)和一个FIFO线程同步队列(多线程争用资源被阻塞时会进入此队列)。

在这里插入图片描述

自定义同步器

自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程同步队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

源码解析

独占式同步状态的获取和释放

独占式,同一时刻仅有一个线程持有同步状态。

独占式获取

acquire(int arg) 独占式获取同步状态,但是该方法对中断不敏感,也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。源码如下:

public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}

函数流程如下:

  1. tryAcquire尝试直接获取资源,如果成功则直接返回;
  2. addWaiter将该线程加入同步队列的尾部,并标记为独占模式;
  3. acquireQueued线程在等待队列中自旋获取资源,获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt,将中断补上。
tryAcquire(int)
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}

该方法为protected方法,主要由子类进行实现;自定义同步器时就必须实现该方法。

addWaiter(Node)
//共享式结点
static final Node SHARED = new Node();
//独占式结点
static final Node EXCLUSIVE = null;//mode指定是独占式还是共享式
private Node addWaiter(Node mode) {//根据当前线程构建Node结点,并指定结点类型:独占还是共享Node node = new Node(Thread.currentThread(), mode);//如果此时队列不为空,则直接将结点加到队尾 Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//若队列为空,则初始化队列,并且head和tail指针均指向该结点enq(node);return node;
}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}

Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有5种取值CANCELLEDSIGNALCONDITIONPROPAGATE 和 0。

//线程由于超时或中断会处于该状态,结点一旦处于该结点,后续状态就无法变更,也不会阻塞
static final int CANCELLED =  1;//当前结点的后继结点被阻塞,当前结点释放资源或cancel时,需要唤醒其后继结点
static final int SIGNAL    = -1;//当前结点目前处于condition队列中
static final int CONDITION = -2;//该状态与共享模式相关,下面会进行分析
static final int PROPAGATE = -3;//0表示初始化状态
acquireQueued(Node, int)

通过 tryAcquireaddWaiter,该线程获取资源失败,已经被放入同步队列尾部了,此时线程会通过自旋来尝试获取资源。

final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();//如果结点的前驱结点为头结点,并且获取同步资源成功if (p == head && tryAcquire(arg)) {//设置当前结点为头结点setHead(node);// help GCp.next = null; failed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

接下来我们分析下shouldParkAfterFailedAcquireparkAndCheckInterrupt方法。

shouldParkAfterFailedAcquire(Node, Node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//获取前驱结点的等待状态,如果前驱结点的等待状态已经是SIGNAL,则直接返回true,表明当前结点只需等待唤醒int ws = pred.waitStatus;if (ws == Node.SIGNAL)return true;if (ws > 0) {//前驱结点已经cancel,则需要一直往前找,直到找到最近的一个正常状态的结点,并将其作为当前结点的前驱结点。//中间处于无效状态的结点,因为引用不可达,稍后会被GCdo {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驱处于正常状态,则把前驱状态设置为SIGNALcompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;
}

该方法的作用就是把当前结点排到等待状态为SIGNAL的结点之后,找到该安全休息点之后就调用parkAndCheckInterrupt使当前线程进入等待状态。

parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}
小结

独占式获取同步状态的流程如下:
在这里插入图片描述

独占式释放

当线程获取同步状态后,执行完相应逻辑后就需要释放同步状态。AQS提供了release(int arg)方法释放同步状态。

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;
}

先调用自定义同步器自定义的tryRelease(int arg)方法来释放同步状态,释放成功后,会调用unparkSuccessor(Node node)方法唤醒后继节点。

unparkSuccessor(Node)
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//如果当前结点的后继结点不为空,则直接通过unpark唤醒//如果当前结点的后继结点为空,则从尾结点开始遍历,找到最前边的正常状态的结点,然后调用unpark唤醒该结点Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

该方法的核心在于 unpark 唤醒等待队列中最前边的那个未放弃线程

共享式同步状态的获取和释放

共享式与独占式的最主要区别在于同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。

共享式获取

AQS提供acquireShared(int arg)方法共享式获取同步状态.

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);
}

函数流程如下:

  1. 调用tryAcquireShared(int arg)获取指定量的资源,负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取;
  2. 获取失败则调用doAcquireShared(int arg)将当前线程加入同步队列,以自旋方式获取同步状态。
tryAcquireShared
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}

该方法为protected方法,主要由子类进行实现;自定义同步器时就必须实现该方法。

doAcquireShared
private void doAcquireShared(int arg) {//将当前线程以共享模式结点加入同步队列final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {//前驱结点为头结点,则尝试获取资源int r = tryAcquireShared(arg);if (r >= 0) {//获取成功后将当前结点设为头结点,还有剩余资源可以再唤醒后面结点setHeadAndPropagate(node, r);// help GCp.next = null; if (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}

跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时,才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park等待其他线程释放资源,也更不会去唤醒老三和老四了。

setHeadAndPropagate
 private void setHeadAndPropagate(Node node, int propagate) {Node h = head; //设置当前结点为头结点setHead(node);//如果还有剩余资源,会继续唤醒后继结点if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}
  • 与独占式获取资源的差异在于,当前结点获取到资源后,如果资源还有剩余,会继续去唤醒后继结点。
共享式释放

releaseShared(int arg)是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒同步队列里的其他线程来获取资源。

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
tryReleaseShared
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}
doReleaseShared
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;//头结点的等待状态为SIGNAL时,将等待状态更改为0,更新成功后唤醒后继结点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            unparkSuccessor(h);}else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                }//如果头结点未发生变更,则中断该循环;//如果头结点发生了变更,则继续该循环,继续唤醒后继结点;if (h == head)                   break;}
}

因为可能会存在多个线程同时进行释放同步状态资源,所以需要确保同步状态安全地成功释放,一般都是通过CAS和循环来完成的。

资料参考

Java并发之AQS详解

死磕Java并发

这篇关于【并发基础】AQS详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1040368

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

Spring Cloud LoadBalancer 负载均衡详解

《SpringCloudLoadBalancer负载均衡详解》本文介绍了如何在SpringCloud中使用SpringCloudLoadBalancer实现客户端负载均衡,并详细讲解了轮询策略和... 目录1. 在 idea 上运行多个服务2. 问题引入3. 负载均衡4. Spring Cloud Load

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

在 Spring Boot 中使用 @Autowired和 @Bean注解的示例详解

《在SpringBoot中使用@Autowired和@Bean注解的示例详解》本文通过一个示例演示了如何在SpringBoot中使用@Autowired和@Bean注解进行依赖注入和Bean... 目录在 Spring Boot 中使用 @Autowired 和 @Bean 注解示例背景1. 定义 Stud

如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别详解

《如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别详解》:本文主要介绍如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别的相关资料,描述了如何使用海康威视设备网络SD... 目录前言开发流程问题和解决方案dll库加载不到的问题老旧版本sdk不兼容的问题关键实现流程总结前言作为

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

SQL 中多表查询的常见连接方式详解

《SQL中多表查询的常见连接方式详解》本文介绍SQL中多表查询的常见连接方式,包括内连接(INNERJOIN)、左连接(LEFTJOIN)、右连接(RIGHTJOIN)、全外连接(FULLOUTER... 目录一、连接类型图表(ASCII 形式)二、前置代码(创建示例表)三、连接方式代码示例1. 内连接(I

Go路由注册方法详解

《Go路由注册方法详解》Go语言中,http.NewServeMux()和http.HandleFunc()是两种不同的路由注册方式,前者创建独立的ServeMux实例,适合模块化和分层路由,灵活性高... 目录Go路由注册方法1. 路由注册的方式2. 路由器的独立性3. 灵活性4. 启动服务器的方式5.

Java中八大包装类举例详解(通俗易懂)

《Java中八大包装类举例详解(通俗易懂)》:本文主要介绍Java中的包装类,包括它们的作用、特点、用途以及如何进行装箱和拆箱,包装类还提供了许多实用方法,如转换、获取基本类型值、比较和类型检测,... 目录一、包装类(Wrapper Class)1、简要介绍2、包装类特点3、包装类用途二、装箱和拆箱1、装