面试官问我AQS中的PROPAGATE有什么用?

2023-11-03 06:31
文章标签 面试官 aqs propagate

本文主要是介绍面试官问我AQS中的PROPAGATE有什么用?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

之前分析过AQS的源码,但只分析了独占锁的原理。

而刚好我们可以借助Semaphore来分析共享锁。

如何使用Semaphore

public class SemaphoreDemo {public static void main(String[] args) {// 申请共享锁数量Semaphore sp = new Semaphore(3);for(int i = 0; i < 5; i++) {new Thread(() -> {try {// 获取共享锁sp.acquire();String threadName = Thread.currentThread().getName();// 访问APISystem.out.println(threadName + " 获取许可,访问API。剩余许可数 " + sp.availablePermits());TimeUnit.SECONDS.sleep(1);// 释放共享锁sp.release();System.out.println(threadName + " 释放许可,当前可用许可数为 " + sp.availablePermits());} catch (InterruptedException e) {e.printStackTrace();}}, "thread-" + (i+1)).start();}}
}

Java SDK 里面提供了 Lock,为啥还要提供一个 Semaphore ?其实实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。

比较常见的需求就是我们工作中遇到的连接池、对象池、线程池等等池化资源。其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。

比如上面的代码就演示了同时最多只允许3个线程访问API。

如何依托AQS实现Semaphore

abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {setState(permits);}// 获取锁final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}// 释放锁protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current)throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}}

Semaphore的acquire/release还是使用到了AQS,它把state的值作为共享资源的数量。获取锁的时候state的值减去1,释放锁的时候state的值加上1。

锁的获取

public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}// AQS
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

Semaphore也有公平锁和非公平锁两种实现,不过都是借助于AQS的,这里默认实现是非公平锁,所以最终会调用nonfairTryAcquireShared方法。

锁的释放

public void release() {sync.releaseShared(1);
}// AQS
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}

锁的释放成功后,会调用doReleaseShared(),这个方法后面会分析。

获取锁失败

当获取锁失败后,新的线程就会被加入队列

 public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 实际调用的是NonFairSync中的nonfairTryAcquireSharedif (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

当锁的数量小于0的时候,需要入队。

共享锁调用的方法doAcquireSharedInterruptibly()和独占锁调用的方法acquireQueued()只有一些细微的区别。

区别

首先独占锁构造的节点是模式是EXCLUSIVE,而共享锁构造模式是SHARED,它们使用的是AQS中的nextWaiter变量来区分的。

其次在在准备入队的时候,如果尝试获取共享锁成功,那么会调用setHeadAndPropagate()方法,重新设置头节点并决定是否需要唤醒后继节点

private void setHeadAndPropagate(Node node, int propagate) {// 旧的头节点Node h = head;// 将当前获取到锁的节点设置为头节点setHead(node);// 如果仍然有多的锁(propagate的值是nonfairTryAcquireShared()返回值)// 或者旧的头结点为空,或者头结点的 ws 小于0// 又或者新的头结点为空,或者新头结点的 ws 小于0,则唤醒后继节点if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}private void setHead(Node node) {head = node;node.thread = null;node.prev = null;
}private void doReleaseShared() {    for (;;) {Node h = head;// 保证同步队列中至少有两个节点if (h != null && h != tail) {int ws = h.waitStatus;// 需要唤醒后继节点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}// 将节点状态更新为PROPAGATEelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}
}

其实看到这里就有一些逻辑我看不懂了,比如setHeadAndPropagate()方法中的这一段逻辑

if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {

写成这样不好吗?

if(propagate > 0  && h.waitStatus < 0)

为什么要那么复杂呢?而且看上去这样也可以嘛,我本来想要假装看懂的,可是我发现骗自己真的不容易呀。

这里应该是有什么特殊的原因,不然Doug Lea老爷子不会这么写。。。。。。

PROPAGATE状态有什么用?

我就去网上搜索了下,结果在Java的Bug列表中发现是因为有一个bug才这样修改的

https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6801020

JDK Bug

看到这个bug在2011年就在JDK6中被修复了,我想说那个时候我还不知道java是啥呢。。。。

这个修改可以在Doug Lead老爷子的主页中找到,通过JSR 166找到可对比的CSV,对比1.73和1.74两个版本

http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java?r1=1.73&r2=1.74

我们先看看setHeadAndPropagate中的修改对比

对比

以前的版本中判断条件是这样的

if (propagate > 0 && node.waitStatus != 0)

这样的判断很符合我的认知嘛。但是会造成怎样的问题呢?按照bug的描述,我们来纸上谈兵下没有PROPAGATE状态的时候会出什么问题。

首先 Semaphore初始化state值为0,然后4个线程分别运行4个任务。线程t1,t2同时获取锁,另外两个线程t3,t3同时释放锁

public class TestSemaphore {// 这里将信号量设置成了0private static Semaphore sem = new Semaphore(0);private static class Thread1 extends Thread {@Overridepublic void run() {// 获取锁sem.acquireUninterruptibly();}}private static class Thread2 extends Thread {@Overridepublic void run() {// 释放锁sem.release();}}public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10000000; i++) {Thread t1 = new Thread1();Thread t2 = new Thread1();Thread t3 = new Thread2();Thread t4 = new Thread2();t1.start();t2.start();t3.start();t4.start();t1.join();t2.join();t3.join();t4.join();System.out.println(i);}}
}

根据上面的代码,我们将信号量设置为0,所以t1,t2获取锁会失败。

假设某次循环中队列中的情况如下

head --> t1 --> t2(tail)

锁的释放由t3先释放,t4后释放

时刻1: 线程t3调用releaseShared(),然后唤醒队列中节点(线程t1),此时head的状态从-1变成0

时刻2: 线程t1由于线程t3释放了锁,被t3唤醒,然后通过nonfairTryAcquireShared()取得propagate值为0

再次获取锁

时刻3: 线程t4调用releaseShared(),读到此时waitStatue为0(和时刻1中的head是同一个head),不满足条件,因此不唤醒后继节点

diff

时刻4: 线程t1获取锁成功,调用setHeadAndPropagate(),因为不满足propagate > 0(时刻2中propagate == 0),从而不会唤醒后继节点

如果没有PROPAGATE状态,上面的情况就会导致线程t2不会被唤醒。

那在引入了propagate之后这个变量又会是怎样的情况呢?

时刻1: 线程t3调用doReleaseShared,然后唤醒队列中结点(线程t1),此时head的状态从-1变成0

时刻2: 线程t1由于t3释放了信号量,被t3唤醒,然后通过nonfairTryAcquireShared()取得propagate值为0

时刻3: 线程t4调用releaseShared(),读到此时waitStatue为0(和时刻1中的head是同一个head),将节点状态设置为PROPAGATE(值为-3)

 else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;     // loop on failed CAS

时刻4: 线程t1获取锁成功,调用setHeadAndPropagate(),虽然不满足propagate > 0(时刻2中propagate == 0),但是waitStatus<0,所以会去唤醒后继节点

至此我们知道了PROPAGATE的作用,就是为了避免线程无法会唤醒的窘境。,因为共享锁会有很多线程获取到锁或者释放锁,所以有些方法是并发执行的,就会产生很多中间状态,而PROPAGATE就是为了让这些中间状态不影响程序的正常运行。

doReleaseShared-小方法大智慧

无论是释放锁还是申请到锁都会调用doReleaseShared()方法,这个方法看似简单,其实里面的逻辑还是很精妙的。

private void doReleaseShared() {    for (;;) {Node h = head;// 保证同步队列中至少有两个节点if (h != null && h != tail) {int ws = h.waitStatus;// 需要唤醒后继节点if (ws == Node.SIGNAL) {// 可能有其他线程调用doReleaseShared(),unpark操作只需要其中一个调用就行了if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;     // loop to recheck casesunparkSuccessor(h);}// 将节点状态设置为PROPAGATE(画重点了)else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;    // loop on failed CAS}if (h == head)        // loop if head changedbreak;}
}

这其中有一个判断条件

ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)

这个if条件成立也巧妙

  1. 首先队列中现在至少有两个节点,简化分析,我们认为它只有两个节点,head --> node
  2. 执行到else if,说明跳过了前面的if条件,说明头结点是刚成为头结点的,它的waitStatus为0,尾节点是在这之后加入的,发生这种情况是shouldParkAfterFailedAcquire()中还没来得及将前一个节点的ws值修改为SIGNAL
  3. CAS失败说明此时头结点的ws不为0了,也就表明shouldParkAfterFailedAcquire()已经将前驱节点的waitStatus值修改为了SIGNAL了

更新前一个节点状态

而整个循环的退出条件是在h==head的时候,这个是为什么呢?

由于我们的head节点是一个虚拟节点(也可以叫做哨兵节点),假设我们的同步队列中节点顺序如下:

head --> A --> B --> C

现在假设A拿到了共享锁,那么它将成为新的dummy node(虚拟节点),

head(A) --> B --> C

此时A线程会调用doReleaseShared方法唤醒后继节点B,它很快就获取到了锁,并成为了新的头节点

head(B) --> C

此时B线程也会调用该方法,并唤醒其后继节点C,但是在B线程调用的时候,线程A可能还没有运行结束,也正在执行这个方法,
当它执行到h==head的时候发现head改变了,所以for循环就不会退出,又会继续执行for循环,唤醒后继节点。

至此我们共享锁分析完毕,其实只要弄明白了AQS的逻辑,依赖于AQS实现的Semaphore就很简单了。

在看共享锁源码过程中尤其需要注意的是方法是会被多个线程并发执行的,所以其中很多判断是多线程竞争情况下才会出现的。同时需要注意的是共享锁并不能保证线程安全,仍然需要程序员自己保证对共享资源的操作是安全的。

这篇关于面试官问我AQS中的PROPAGATE有什么用?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/336253

相关文章

【吊打面试官系列-Redis面试题】说说 Redis 哈希槽的概念?

大家好,我是锋哥。今天分享关于 【说说 Redis 哈希槽的概念?】面试题,希望对大家有帮助; 说说 Redis 哈希槽的概念? Redis 集群没有使用一致性 hash,而是引入了哈希槽的概念,Redis 集群有 16384 个哈希槽,每个 key 通过 CRC16 校验后对 16384 取模来决定放置哪个槽, 集群的每个节点负责一部分 hash 槽。

【编程底层思考】详解Java的JUC多线程并发编程底层组件AQS的作用及原理

Java中的AbstractQueuedSynchronizer(简称AQS)是位于java.util.concurrent.locks包中的一个核心组件,用于构建锁和其他同步器。AQS为实现依赖于FIFO(先进先出)等待队列的阻塞锁和相关同步器提供了一套高效、可扩展的框架。 一、AQS的作用 统一同步状态管理:AQS提供了一个int类型的成员变量state,用于表示同步状态。子类可以根据自己

面试官:synchronized的锁升级过程是怎样的?

大家好,我是大明哥,一个专注「死磕 Java」系列创作的硬核程序员。 回答 在 JDK 1.6之前,synchronized 是一个重量级、效率比较低下的锁,但是在JDK 1.6后,JVM 为了提高锁的获取与释放效,,对 synchronized 进行了优化,引入了偏向锁和轻量级锁,至此,锁的状态有四种,级别由低到高依次为:无锁、偏向锁、轻量级锁、重量级锁。 锁升级就是无锁 —>

【Unity面经】实习篇:面试官常问的一百个面试题

👨‍💻个人主页:@元宇宙-秩沅 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 秩沅 原创 👨‍💻 专栏交流🧧🟥Unity100个实战基础✨🎁🟦 Unity100个精华一记✨🎁🟩 Unity50个demo案例教程✨🎁🟨 Unity100个精华细节BUG✨🎁🟨 Unity100个面试题✨🎁 文章

深入理解java并发编程之aqs框架

跟synchronized 相比较,可重入锁ReentrankLock其实原理有什么不同?   所得基本原理是为了达到一个目的;就是让所有线程都能看到某种标记。synchronized通过在对象头中设置标记实现了这一目的,是一种JVM原生的锁实现方式。而ReentrantLock以及所有的基于Lock接口的实现类,都是通过一个volitile修饰的int型变量,并保证每个线程都能拥有对该in

作为面试官的一点点感悟,谈谈技术人的成长之路

因为工作上的原因,做过几次面试官,面试的同学有应届生,也有工作3-5年的老技术人。最近也频繁作为面试官帮助筛选候选人,中间有很多值得深思的东西,我记录了下来分享给大家。 以下观点仅为个人观点,不代表任何公司的立场。        01 面试不是简单的你问我答 一般来讲,作为面试官和候选人进行沟通的第一个问题是一般是自我介绍,整个自我介绍的情况应该控制在2分钟左右,阐述自己的教育背景,项目经历

【对线面试官】阿里面试经历,有些人走一步看一步就挂了

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 这个其实说来就话长了。是小编曾经面试阿里妈妈的经历。 这次面试最终在HR面挂掉,以至于后面回忆起来,仍然是一桩美谈。 这次面试长达一个月之久,共经历了4轮技术面,1轮HR。前四轮面试过关斩将,简直开了挂一般,跟面试官正面对线,丝毫不虚。听我一一道来。 第一轮 第一面是电话面试,晚上10点半。我特么一脸问号?你们这是刚加完班吧?事实上我

【大数据哔哔集20210122】面试官问我HDFS丢不丢数据?我啪就把这个文章甩到他脸上

数据一致性 HDFS作为分布式文件系统在分布式环境下如何保证数据一致性。HDFS中,存储的文件将会被分成若干的大小一致的block分布式地存储在不同的机器上,需要NameNode节点来对这些数据进行管理,存储这些block的结点称为DataNode,NameNode是用来管理这些元数据的。 NameNode保证元数据的一致性 客户端上传文件时,NameNode首先往edits log文件

几乎每一位面试官都会关注的能力,你做到了吗?

又到了金九银十招聘季,虽然说大环境不好,但对于不少想要挪窝的同学来说,这个时间段还是一个不错的窗口期。 我也借此机会在Boss上看了不少岗位,发现很多岗位JD都有一条关于“功能设计规范”的要求。 相比较于设计岗的设计规范原则,产品岗的设计规范会要求你对业务、产品有更强的纵深性,但这种基础且重要的能力被太多人忽视了。 因此,我列举了以下11点产品设计规范,同学们可以自查一下看看日常有没有做到

Java并发之AQS与自旋锁(利用CAS实现)

一、概述   谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronizer(AQS)!   类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch...。   以下是本文的目录大