aqs中关于propagate状态的思考

2023-11-03 06:31
文章标签 思考 状态 aqs propagate

本文主要是介绍aqs中关于propagate状态的思考,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

https://blog.csdn.net/cq_pf/article/details/113387256?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-113387256-blog-110535122.pc_relevant_default&spm=1001.2101.3001.4242.1&utm_relevant_index=2

之前分析过AQS的源码,但只分析了独占锁的原理。

而刚好我们可以借助Semaphore来分析共享锁。

如何使用Semaphore

public class SemaphoreDemo {
​public static void main(String[] args) {
​// 申请共享锁数量Semaphore sp = new Semaphore(3);
​for(int i = 0; i < 5; i++) {new Thread(() -> {try {
​// 获取共享锁sp.acquire();
​String threadName = Thread.currentThread().getName();
​// 访问APISystem.out.println(threadName + " 获取许可,访问API。剩余许可数 " + sp.availablePermits());
​TimeUnit.SECONDS.sleep(1);// 释放共享锁sp.release();
​System.out.println(threadName + " 释放许可,当前可用许可数为 " + sp.availablePermits());
​} catch (InterruptedException e) {e.printStackTrace();}
​}, "thread-" + (i+1)).start();}}
}
12345678910111213141516171819202122232425262728293031323334

Java SDK 里面提供了 Lock,为啥还要提供一个 Semaphore ?其实实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。

比较常见的需求就是我们工作中遇到的连接池、对象池、线程池等等池化资源。其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。

比如上面的代码就演示了同时最多只允许3个线程访问API。

如何依托AQS实现Semaphore

abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {setState(permits);}
​// 获取锁final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
​// 释放锁protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current)throw new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
​
}
​
123456789101112131415161718192021222324252627282930

Semaphore的acquire/release还是使用到了AQS,它把state的值作为共享资源的数量。获取锁的时候state的值减去1,释放锁的时候state的值加上1。

锁的获取

public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
​
// AQS
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
​
12345678910111213

Semaphore也有公平锁和非公平锁两种实现,不过都是借助于AQS的,这里默认实现是非公平锁,所以最终会调用nonfairTryAcquireShared方法。

锁的释放

public void release() {sync.releaseShared(1);
}
​
// AQS
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}
​
12345678910111213

锁的释放成功后,会调用doReleaseShared(),这个方法后面会分析。

获取锁失败

当获取锁失败后,新的线程就会被加入队列

 public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 实际调用的是NonFairSync中的nonfairTryAcquireSharedif (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
12345678

当锁的数量小于0的时候,需要入队。

共享锁调用的方法doAcquireSharedInterruptibly()和独占锁调用的方法acquireQueued()只有一些细微的区别。

区别

首先独占锁构造的节点是模式是EXCLUSIVE,而共享锁构造模式是SHARED,它们使用的是AQS中的nextWaiter变量来区分的。

其次在在准备入队的时候,如果尝试获取共享锁成功,那么会调用setHeadAndPropagate()方法,重新设置头节点并决定是否需要唤醒后继节点

private void setHeadAndPropagate(Node node, int propagate) {// 旧的头节点Node h = head;// 将当前获取到锁的节点设置为头节点setHead(node);
​// 如果仍然有多的锁(propagate的值是nonfairTryAcquireShared()返回值)// 或者旧的头结点为空,或者头结点的 ws 小于0// 又或者新的头结点为空,或者新头结点的 ws 小于0,则唤醒后继节点if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}
}
​
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;
}
​
private void doReleaseShared() {    for (;;) {Node h = head;// 保证同步队列中至少有两个节点if (h != null && h != tail) {int ws = h.waitStatus;
​// 需要唤醒后继节点if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}// 将节点状态更新为PROPAGATEelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}
}
​
12345678910111213141516171819202122232425262728293031323334353637383940414243444546

其实看到这里就有一些逻辑我看不懂了,比如setHeadAndPropagate()方法中的这一段逻辑

if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {
​
123

写成这样不好吗?

if(propagate > 0  && h.waitStatus < 0)
1

为什么要那么复杂呢?而且看上去这样也可以嘛,我本来想要假装看懂的,可是我发现骗自己真的不容易呀。

这里应该是有什么特殊的原因,不然Doug Lea老爷子不会这么写。。。。。。

PROPAGATE状态有什么用?

我就去网上搜索了下,结果在Java的Bug列表中发现是因为有一个bug才这样修改的

Bug ID: JDK-6801020 Concurrent Semaphore release may cause some require thread not signaled

JDK Bug

看到这个bug在2011年就在JDK6中被修复了,我想说那个时候我还不知道java是啥呢。。。。

这个修改可以在Doug Lead老爷子的主页中找到,通过JSR 166找到可对比的CSV,对比1.73和1.74两个版本

http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java?r1=1.73&r2=1.74

我们先看看setHeadAndPropagate中的修改对比

对比

以前的版本中判断条件是这样的

if (propagate > 0 && node.waitStatus != 0)
1

这样的判断很符合我的认知嘛。但是会造成怎样的问题呢?按照bug的描述,我们来纸上谈兵下没有PROPAGATE状态的时候会出什么问题。

首先 Semaphore初始化state值为0,然后4个线程分别运行4个任务。线程t1,t2同时获取锁,另外两个线程t3,t3同时释放锁

aqs中队列的节点有多个状态,signal,cancelled,condition,propagate,0这几个状态,其他的状态都很好理解,但是对于propagate这个状态很难理解,它只在doReleaseShared这个方法中会切换到这个状态。在jdk1.6之前是没有这个状态的,引入这个状态是因为一个jdk bug。这个bug展示的是四个线程中两个线程来acquireShared,两个线程来releaseShared,则有可能有一个线程会挂住,一直在等待其他线程的唤醒。下面是具体的代码实现。
​
import java.util.concurrent.Semaphore;
public class TestSemaphore {private static Semaphore sem = new Semaphore(0);private static class Thread1 extends Thread {@Overridepublic void run() {sem.acquireUninterruptibly();}}private static class Thread2 extends Thread {@Overridepublic void run() {sem.release();}}public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10000000; i++) {Thread t1 = new Thread1();Thread t2 = new Thread1();Thread t3 = new Thread2();Thread t4 = new Thread2();t1.start();t2.start();t3.start();t4.start();t1.join();t2.join();t3.join();t4.join();System.out.println(i);}}
}
在具体分析之前先看一下jdk1.6之前的代码的逻辑,可以看到其在确定需不需要进行unpark后续节点时是当前的节点上有signal并且当前已知还有剩余信号量。从而产生了上面的bug.具体的jdk的此次更改可以看这里。
​
​
​
下面来详细分析一下上面的代码的逻辑以及其最终产生挂起一个节点的原因。
​
假设某一时刻t1和t2节点都没获取到信号量,并且都加入到了队列中,此时aqs的状态如下面第一张图展示的。
此时t3线程调用relased,释放了一个信号量,并且此时由于此时的head的标识是signal,所以它唤醒t1线程,并且此时的state由于t3的释放1,导致了state=1
此时t1节点被唤醒后会自旋尝试获取信号量,当其获取了信号量后,其所知道此时的剩余的信号量为0,即其传入setHeadAndPropagate的propagate为0
此时t4线程释放信号量,将当前的state状态切换为1,但是此时的head还是原来的head,这个head的状态为0,不会对后续节点进行unpark操作
再然后t1线程将head切换切换为其对应的node,但是由于其propagate为0,故此也不会进行unpark后续节点,所以这个队列最终会维持在最后一张图的状态,然后t2线程永远也收不到unpark而进行阻塞了。
​
​
下面再分析一下当前的代码逻辑。新的代码对head之前和更新之后的head都进行了判断,并且对与propagate>0也时进行了或的判断,对与上面的情况主要时因为其两个判断时与的逻辑,而新的判断都是或的条件,这些判断可能会造成多余的unpark。
​
//当前节点竞争到共享变量后,将heaad设置为当前节点,并且在当前节点
//设置为head之时因为当前以shared形式竞争到共享变量,则表示有非共享的进行了释放
//所以可以唤醒后续的共享的等待节点
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);//propagate>0表示当前还有信号量可以给后续节点使用//waitStatus<0表示其处于propogate或者signal的状态,if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;//这里s为null表示其不知道后续节点是什么,所以也尝试releaseShared//不过next节点为空的情况是当新加入节点时时先将新加的节点通过cas设置为tail,然后再修改//其前面的节点的next为新加入节点的,所以在这中间,当前节点tail节点时其也可能为空的if (s == null || s.isShared())doReleaseShared();}
}
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) { //这里表示队列中至少有两个节点int ws = h.waitStatus;if (ws == Node.SIGNAL) {//当前head是signal,表示其后继需要unpark,则进行unparkif (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck casesunparkSuccessor(h);}//当前head状态为0,表示其后继节点还没有进行park的,但是需要将当前节点切换为propagate,表示//其需要传播releasedelse if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   //h不等于head则表示在当前操作时中间发生了head的变化break;}
}
还有就是引入了propagate这个状态,这个状态我个人认为不是必要的。
​
首先上面Bug的情况只要判断改为或,即将propagate>0&&node.waitStatus != 0这个判断改位或,那么t1线程再setHead之后因为其状态为signal,则可以唤醒t2线程
如果此时t1的状态不为signal,即t2节点是在t1线程进行了判断之后加入的,即此时t1为的状态为0,则表示t2线程此时一定还没有进行park,其会在自己的自旋线程中获取到共享变量成功

根据上面的代码,我们将信号量设置为0,所以t1,t2获取锁会失败。

假设某次循环中队列中的情况如下

head --> t1 --> t2(tail)
1

锁的释放由t3先释放,t4后释放

时刻1: 线程t3调用releaseShared(),然后唤醒队列中节点(线程t1),此时head的状态从-1变成0

时刻2: 线程t1由于线程t3释放了锁,被t3唤醒,然后通过nonfairTryAcquireShared()取得propagate值为0

再次获取锁

时刻3: 线程t4调用releaseShared(),读到此时waitStatue为0(和时刻1中的head是同一个head),不满足条件,因此不唤醒后继节点

diff

时刻4: 线程t1获取锁成功,调用setHeadAndPropagate(),因为不满足propagate > 0(时刻2中propagate == 0),从而不会唤醒后继节点

如果没有PROPAGATE状态,上面的情况就会导致线程t2不会被唤醒。

那在引入了propagate之后这个变量又会是怎样的情况呢?

时刻1: 线程t3调用doReleaseShared,然后唤醒队列中结点(线程t1),此时head的状态从-1变成0

时刻2: 线程t1由于t3释放了信号量,被t3唤醒,然后通过nonfairTryAcquireShared()取得propagate值为0

时刻3: 线程t4调用releaseShared(),读到此时waitStatue为0(和时刻1中的head是同一个head),将节点状态设置为PROPAGATE(值为-3)

 else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;     // loop on failed CAS
123

时刻4: 线程t1获取锁成功,调用setHeadAndPropagate(),虽然不满足propagate > 0(时刻2中propagate == 0),但是waitStatus<0,所以会去唤醒后继节点

至此我们知道了PROPAGATE的作用,就是为了避免线程无法会唤醒的窘境。,因为共享锁会有很多线程获取到锁或者释放锁,所以有些方法是并发执行的,就会产生很多中间状态,而PROPAGATE就是为了让这些中间状态不影响程序的正常运行。

doReleaseShared-小方法大智慧

无论是释放锁还是申请到锁都会调用doReleaseShared()方法,这个方法看似简单,其实里面的逻辑还是很精妙的。

private void doReleaseShared() {    for (;;) {Node h = head;// 保证同步队列中至少有两个节点if (h != null && h != tail) {int ws = h.waitStatus;
​// 需要唤醒后继节点if (ws == Node.SIGNAL) {// 可能有其他线程调用doReleaseShared(),unpark操作只需要其中一个调用就行了if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;     // loop to recheck casesunparkSuccessor(h);}// 将节点状态设置为PROPAGATE(画重点了)else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;    // loop on failed CAS}if (h == head)        // loop if head changedbreak;}
}
​
123456789101112131415161718192021222324

这其中有一个判断条件

ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
1

这个if条件成立也巧妙

  1. 首先队列中现在至少有两个节点,简化分析,我们认为它只有两个节点,head --> node

  2. 执行到else if,说明跳过了前面的if条件,说明头结点是刚成为头结点的,它的waitStatus为0,尾节点是在这之后加入的,发生这种情况是shouldParkAfterFailedAcquire()中还没来得及将前一个节点的ws值修改为SIGNAL

  3. CAS失败说明此时头结点的ws不为0了,也就表明shouldParkAfterFailedAcquire()已经将前驱节点的waitStatus值修改为了SIGNAL了

更新前一个节点状态

而整个循环的退出条件是在h==head的时候,这个是为什么呢?

由于我们的head节点是一个虚拟节点(也可以叫做哨兵节点),假设我们的同步队列中节点顺序如下:

head --> A --> B --> C

现在假设A拿到了共享锁,那么它将成为新的dummy node(虚拟节点),

head(A) --> B --> C

此时A线程会调用doReleaseShared方法唤醒后继节点B,它很快就获取到了锁,并成为了新的头节点

head(B) --> C

此时B线程也会调用该方法,并唤醒其后继节点C,但是在B线程调用的时候,线程A可能还没有运行结束,也正在执行这个方法, 当它执行到h==head的时候发现head改变了,所以for循环就不会退出,又会继续执行for循环,唤醒后继节点。

至此我们共享锁分析完毕,其实只要弄明白了AQS的逻辑,依赖于AQS实现的Semaphore就很简单了。

在看共享锁源码过程中尤其需要注意的是方法是会被多个线程并发执行的,所以其中很多判断是多线程竞争情况下才会出现的。同时需要注意的是共享锁并不能保证线程安全,仍然需要程序员自己保证对共享资源的操作是安全的。

这篇关于aqs中关于propagate状态的思考的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/336257

相关文章

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

hdu1565(状态压缩)

本人第一道ac的状态压缩dp,这题的数据非常水,很容易过 题意:在n*n的矩阵中选数字使得不存在任意两个数字相邻,求最大值 解题思路: 一、因为在1<<20中有很多状态是无效的,所以第一步是选择有效状态,存到cnt[]数组中 二、dp[i][j]表示到第i行的状态cnt[j]所能得到的最大值,状态转移方程dp[i][j] = max(dp[i][j],dp[i-1][k]) ,其中k满足c

状态dp总结

zoj 3631  N 个数中选若干数和(只能选一次)<=M 的最大值 const int Max_N = 38 ;int a[1<<16] , b[1<<16] , x[Max_N] , e[Max_N] ;void GetNum(int g[] , int n , int s[] , int &m){ int i , j , t ;m = 0 ;for(i = 0 ;

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

hdu3006状态dp

给你n个集合。集合中均为数字且数字的范围在[1,m]内。m<=14。现在问用这些集合能组成多少个集合自己本身也算。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.Inp

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动

实例:如何统计当前主机的连接状态和连接数

统计当前主机的连接状态和连接数 在 Linux 中,可使用 ss 命令来查看主机的网络连接状态。以下是统计当前主机连接状态和连接主机数量的具体操作。 1. 统计当前主机的连接状态 使用 ss 命令结合 grep、cut、sort 和 uniq 命令来统计当前主机的 TCP 连接状态。 ss -nta | grep -v '^State' | cut -d " " -f 1 | sort |

状态模式state

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/state 在一个对象的内部状态变化时改变其行为, 使其看上去就像改变了自身所属的类一样。 在状态模式中,player.getState()获取的是player的当前状态,通常是一个实现了状态接口的对象。 onPlay()是状态模式中定义的一个方法,不同状态下(例如“正在播放”、“暂停

qml states 状态

states 状态 在QML中,states用于定义对象在不同状态下的属性变化。每个状态可以包含一组属性设置,当状态改变时,这些属性设置会被应用到对象上。 import QtQuick 2.15import QtQuick.Controls 2.15// 定义应用程序的主窗口ApplicationWindow {visible: true // 使窗口可见width: 640 /

[轻笔记] ubuntu Shell脚本实现监视指定进程的运行状态,并能在程序崩溃后重启动该程序

根据网上博客实现,发现只能监测进程离线,然后对其进行重启;然而,脚本无法打印程序正常状态的信息。自己通过不断修改测试,发现问题主要在重启程序的命令上(需要让重启的程序在后台运行,不然会影响监视脚本进程,使其无法正常工作)。具体程序如下: #!/bin/bashwhile [ 1 ] ; dosleep 3if [ $(ps -ef|grep exe_name|grep -v grep|