JUC并发编程--------AQS以及各类锁

2023-10-12 03:52

本文主要是介绍JUC并发编程--------AQS以及各类锁,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

AQS

什么是AQS

java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队
列、独占获取、共享获取等,而这些行为的抽象就是基于 AbstractQueuedSynchronizer(简称
AQS) 实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。
JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的
一般是通过一个内部类Sync继承 AQS
将同步器所有调用都映射到Sync对应的方法

AQS具备的特性:

  • 阻塞等待队列
  • 共享/独占
  • 公平/非公平
  • 可重入
  • 允许中断

AQS核心结构

AQS内部维护属性  volatile int state
state表示资源的可用状态
State三种访问方式:
  • getState()
  • setState()
  • compareAndSetState()
定义了两种资源访问方式:
  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。 尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。 尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。 尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。 尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
AQS定义两种队列
同步等待队列: 主要用于维护获取锁失败时入队的线程。
条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁
AQS 定义了5个队列中节点状态:
  • 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
  • CANCELLED,值为1,表示当前的线程被取消;
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

ReentrantLock

ReentrantLock是一种可重入的独占锁,它允许同一个线程多次获取同一个锁而不会被阻塞。
它的功能类似于synchronized是一种互斥锁,可以保证线程安全。相对于 synchronized,
ReentrantLock具备如下特点:
  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量
  • 与 synchronized 一样,都支持可重入
它的主要应用场景是 在多线程环境下对共享资源进行独占式访问,以保证数据的一致性和安全性。

常用API

ReentrantLock实现了Lock接口规范,常见API如下:

void lock()
获取锁,调用该方法当前线程会获取锁,当锁获
得后,该方法返回
void lockInterruptibly() throws InterruptedException
可中断的获取锁,和lock()方法不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线
boolean tryLock()
尝试非阻塞的获取锁,调用该方法后立即返回。
如果能够获取到返回true,否则返回false
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException
超时获取锁,当前线程在以下三种情况下会被返回:
当前线程在超时时间内获取了锁
当前线程在超时时间内被中断
超时时间结束,返回false
void unlock()
释放锁
Condition newCondition()
获取等待通知组件,该组件和当前的锁绑定,当
前线程只有获取了锁,才能调用该组件的await()
方法,而调用后,当前线程将释放锁
在使用时要注意 4 个问题:
  • 默认情况下 ReentrantLock 为非公平锁而非公平锁;
  • 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
  • 加锁操作一定要放在 try 代码之前,这样可以避免未加锁成功又释放锁的异常;
  • 释放锁一定要放在 finally 中,否则会导致线程阻塞。

公平锁与非公平锁使用

ReentrantLock支持公平锁和非公平锁两种模式:
公平锁:线程在获取锁时,按照等待的先后顺序获取锁。
非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock默认是非公平锁
1 ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁
2 ReentrantLock lock = new ReentrantLock(true); //公平锁//非公平锁final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}//公平锁
final void lock() {acquire(1);}//两者的区别在于非公平锁刚进来则会先用当前线程去尝试获取锁,如果获取失败再走acquire流程

可重入锁

 protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {//如果当前线程是获得锁的线程,表示重入的过程,这时候则需要将锁状态进行增加即可int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}

源码解析

公平锁:

首先先看公平锁类FairSync,其中包含了lock() 以及tryAcquire(int acquires) 方法

    static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire.  Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}

要加锁的时候我们首先调用lock方法, 其中会调用acquire(1)方法

    public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

调用acquire方法之后,由于我们使用的是公平锁,紧接着就会先调用公平锁的tryAcquire方法

 protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}

进入tryAcquire方法之后

首先先通过getState获取状态,当状态值为0的时候,表示锁未被持有,于是进入

if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current)

的判断,在hasQueuedPredecessors()方法中,会判断线程是否有等待阻塞队列,如果有等待阻塞队列,并且头节点是当前线程,则返回false,加上'!'的非判断,就可以进入 compareAndSetState(0, acquires),尝试修改锁的状态为持有,如果成功则调用 setExclusiveOwnerThread(current);方法将当前线程设为锁持有线程。

如果当前状态不为0的时候,表示锁已经被持有,这时候通过if (current == getExclusiveOwnerThread())方法判断锁的持有则是不是当前线程,如果是当前线程,则将锁的持有状态+1,代表重入锁

  public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}

if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current)

只有该方法返回false的情况,才能进入下一步通过cas获取锁。

在该方法中:

判断h!=t,表示头节点不等于尾节点,才能继续判断,否则就是头节点等于尾节点表示阻塞队列为空或则阻塞队列只有一个线程,那么可以直接返回false

当h!=t的时候,表示存在阻塞队列,并且存在多个线程在等待,此时需要继续判断,当s=h.next)==null的时候,表示还不存在阻塞队列,则返回true,需要先创建阻塞队列

当s=h.next)!=null的时候,表示已经存在阻塞队列了,这时候需要进行下一步判断

当s.thread==Thread.currentThread()的时候,表示头节点的下一个节点是当前线程,则返回false准备尝试获取锁

    public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

掉调用tryAcquire方法之后,如果没有成功获取锁,就会将进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法

进入acquireQueued方法之前,先通过addWaiter(Node.EXCLUSIVE)用当前线程入参创建一个节点。

 private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}

通过 Node node = new Node(Thread.currentThread(), mode);将当前线程放入新创的节点中,从上一个方法可以看出传入的mode是Node.EXCLUSIVE,表示独占锁

创建好节点之后,首先判断tail尾节点是否为为空,如果不为空,则将当前节点的前驱节点设置为尾节点,然后通过compareAndSetTail(pred, node)方法,尝试将aqs维护的尾节点换成刚创建的界定啊,如果cas成功,则将原先的尾节点的后序节点设置成当前节点,至此新创建的节点变为尾节点。

如果tail==null,表示还没存在尾节点,这时候需要调用enq(node);方法

   private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

进入enq方法之后,会循环创建尾节点,继续通过tail==null进行二次判断,如果当前尾节点还是null,则cas新建一个节点作为头节点(初始化),完成之后再设置尾节点等于头节点。

然后进入第二次循环,此时tail肯定!=null,则将传入进来的新节点的前驱节点设置为尾节点,然后尝试交换尾节点,成功之后继续将原先的尾节点的后序节点设置为传入的新节点,至此,新线程入队成功,此时addWaiter方法调用完毕

    public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

执行完addwaiter方法之后,接着执行acquireQueued方法

   final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

在该方法中,通过 final Node p = node.predecessor();获取当前线程节点的前驱节点,

如果前驱节点是头节点,表示当前节点排第二,可以通过tryAcquire获取锁,获取锁成功,则通过setHead(node);方法,将节点线程设置为null(因为之后不在需要阻塞与唤醒了),

并且返回fase(该返回值有特别的意义,如果返回true,方法结束之后会接着调用selfInterrupt();方法,将中断标记位设置为true,后面不会在被park中断),如果获取锁失败,就会进入shouldParkAfterFailedAcquire(p, node)方法判断是否需要进行中断,如果需要中断则调用parkAndCheckInterrupt()方法中断,中断结束之后就会将interrupted设置为true并返回。

  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}

该方法是为了判断是否需要进行中断,比较巧妙,

首先进来会先判断前驱节点的状态,默认都是0,而该方法的目的就是要即将前驱节点状态设置成-1,在-1状态下,如果前驱节点释放锁,就会激活该节点,然后重新尝试获取锁。

从源码上可以看到,当线程调用lock的时候,进来这个方法,前驱节点都是0,然后设置完前驱节点的状态为-1之后,会返回一个false,在外层又会进行一次自旋,如果前驱节点是头节点,则可以尝试通过cas尝试获取锁,当获取锁失败再次进来的时候,通过 if (ws == Node.SIGNAL)判断成功,直接返回ture,表示需要中断

    /*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

线程park然后被中断之后,执行Thread.interrupted();清空中断标记位,避免下次无法中断造成不停的自旋不断消耗cpu,至此,lock全过程就完成了

获取锁完成业务之后开始释放锁

  public void unlock() {sync.release(1);}public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

调用unlock方法,然后调用release方法,传入状态值为1

   protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}

先通过tryRelease方法将state-1(加锁的时候是+1,释放锁则需要-1还原),如果状态值为0,则表示释放锁成功,此时通过   setExclusiveOwnerThread(null);将锁持有线程设置为null

如果不为0,则及继续返回一个false,表示锁还没有释放完成,仍然被持有(重入锁状态);

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}

如果释放锁成功,返回ture之后,进入方法体,判断头节点是否为为空,此时头节点就是自己,肯定不为空。并且waitStatus在下一个线程进入阻塞队列之前就已经被设置为-1,所以顺利执行unparkSuccessor(h);方法

    private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

进入该方法后,waitStatus的值为-1,小于0,于是开始通过cas将waitStatus设置为0。

紧接着尝试找到当前节点的下一个节点,如果不为null,则通过unpark唤醒该线程

这篇关于JUC并发编程--------AQS以及各类锁的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/193052

相关文章

菲律宾诈骗,请各位华人朋友警惕各类诈骗。

骗子招聘类型:程序开发、客服、财务、销售总管、打字员等 如果有人用高薪、好的工作环境来你出国工作。要小心注意!因为这些骗子是成群结伴的! 只要你进入一个菲律宾的群,不管什么类型的群都有这些骗子团伙。基本上是他们控制的! 天天在群里有工作的信息,工作信息都是非常诱惑人的。例如招“打字员”、“客服”、“程序员”……各种信息都有。只要你提交简历了,他会根据你的简历判断你这个人如何。所谓的心理战嘛!

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

高并发环境中保持幂等性

在高并发环境中保持幂等性是一项重要的挑战。幂等性指的是无论操作执行多少次,其效果都是相同的。确保操作的幂等性可以避免重复执行带来的副作用。以下是一些保持幂等性的常用方法: 唯一标识符: 请求唯一标识:在每次请求中引入唯一标识符(如 UUID 或者生成的唯一 ID),在处理请求时,系统可以检查这个标识符是否已经处理过,如果是,则忽略重复请求。幂等键(Idempotency Key):客户端在每次

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

生信代码入门:从零开始掌握生物信息学编程技能

少走弯路,高效分析;了解生信云,访问 【生信圆桌x生信专用云服务器】 : www.tebteb.cc 介绍 生物信息学是一个高度跨学科的领域,结合了生物学、计算机科学和统计学。随着高通量测序技术的发展,海量的生物数据需要通过编程来进行处理和分析。因此,掌握生信编程技能,成为每一个生物信息学研究者的必备能力。 生信代码入门,旨在帮助初学者从零开始学习生物信息学中的编程基础。通过学习常用