java.util.concurrent 包源码分析之线程同步辅助

2024-05-04 09:08

本文主要是介绍java.util.concurrent 包源码分析之线程同步辅助,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

CyclicBarrier

CyclicBarrier是一个用于线程同步的辅助类,它允许一组线程等待彼此,直到所有线程都到达集合点,然后执行某个设定的任务。现实中有个很好的例子来形容:几个人约定了某个地方集中,然后一起出发去旅行。每个参与的人就是一个线程,CyclicBarrier就是那个集合点,所有人到了之后,就一起出发。

CyclicBarrier的构造函数有两个:

// parties是参与等待的线程的数量,barrierAction是所有线程达到集合点之后要做的动作
public CyclicBarrier(int parties, Runnable barrierAction);// 达到集合点之后不执行操作的构造函数
public CyclicBarrier(int parties)

需要说明的是,CyclicBarrier只是记录线程的数目,CyclicBarrier是不创建任何线程的。线程是通过调用CyclicBarrier的await方法来等待其他线程,如果调用await方法的线程数目达到了预设值,也就是上面构造方法中的parties,CyclicBarrier就会开始执行barrierAction。

因此我们来看CyclicBarrier的核心方法dowait,也就是await方法调用的私有方法:

   private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// count就是预设的parties,count减1的值表示还剩余几个// 线程没有达到该集合点int index = --count;// index为0表示所有的线程都已经达到集合点,这时// 占用最后一个线程,执行运行设定的任务if (index == 0) {boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 唤醒其他等待的线程,// 更新generation以便下一次运行nextGeneration();return 0;} finally {// 如果运行任务时发生异常,设置状态为broken// 并且唤醒其他等待的线程if (!ranAction)breakBarrier();}}// 还有线程没有调用await,进入循环等待直到其他线程// 达到集合点或者等待超时for (;;) {try {// 如果没有设置超时,进行无超时的等待if (!timed)trip.await();// 有超时设置,进行有超时的等待else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// generation如果没有被更新表示还是当前的运行// (generation被更新表示集合完毕并且任务成功),// 在状态没有被设置为broken状态的情况下,遇到线程// 中断异常表示当前线程等待失败,需要设置为broken// 状态,并且抛出中断异常if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// else对应的条件为:g != generation || g.broken// 表示要么generation已经被更新意味着所有线程已经到达// 集合点并且任务执行成功,要么就是是broken状态意味着// 任务执行失败,无论哪种情况所有线程已经达到集合点,当// 前线程要结束等待了,发生了中断异常,需要中断当前线程// 表示遇到了中断异常。Thread.currentThread().interrupt();}}// 如果发现当前状态为broken,抛出异常if (g.broken)throw new BrokenBarrierException();// generation被更新表示所有线程都已经达到集合点// 并且预设任务已经完成,返回该线程进入等待顺序号if (g != generation)return index;// 等待超时,设置为broken状态并且抛出超时异常if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}
  1. 任何一个线程等待时发生异常,CyclicBarrier都将被设置为broken状态,运行都会失败

  2. 每次运行成功之后CyclicBarrier都会清理运行状态,这样CyclicBarrier可以重新使用

  3. 对于设置了超时的等待,在发生超时的时候会引起CyclicBarrier的broken

CountDownLatch

CountDownLatch同样也是一个线程同步的辅助类,同样适用上面的集合点的场景来解释,但是运行模式完全不同。CyclicBarrier是参与的所有的线程彼此等待,CountDownLatch则不同,CountDownLatch有一个导游线程在等待,每个线程报到一下即可无须等待,等到导游线程发现所有人都已经报到了,就结束了自己的等待。

CountDownLatch的构造方法允许指定参与的线程数量:

public CountDownLatch(int count)

参与线程使用countDown表示报到:

   public void countDown() {sync.releaseShared(1);}

看到releaseShared很容易使人联想到共享锁,那么试着用共享锁的运行模式来解释就简单得多了:和信号量的实现类似,CountDownLatch内置一下有限的共享锁。每个参与线程拥有一把共享锁,调用countDown就等于是释放了自己的共享锁,导游线程await等于一下子要拿回所有的共享锁。那么基于AbstractQueuedSynchronizer类来实现就很简单了:

   public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}

在await时注意到数量是1,其实这个参数对于CountDownLatch实现的Sync类(AbstractQueuedSynchronizer的子类)来说是不起作用的,因为需要保证await获取共享锁时必须拿到所有的共享锁,这个参数也就变得没有意义了。看一下Sync的tryAcquireShared方法就明白了:

        protected int tryAcquireShared(int acquires) {// 和信号量Semaphore的实现一样,使用state来存储count,// 每次释放共享锁就把state减1,state为0表示所有的共享// 锁已经被释放。注意:这里的acquires参数不起作用return (getState() == 0) ? 1 : -1;}

因此Sync的tryReleaseShared就是更新state(每次state减1):

 protected boolean tryReleaseShared(int releases) {// 每次state减1,当state为0,返回false表示所有的共享锁都已经释放for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}

CyclicBarrier和CountDownLatch本质上来说都是多个线程同步的辅助工具,前者可以看成分布式的,后者可以看出是主从式。

Phaser

Phaser是JDK7新添加的线程同步辅助类,作用同CyclicBarrier,CountDownLatch类似,但是使用起来更加灵活:

  1. Parties是动态的。

  2. Phaser支持树状结构,即Phaser可以有一个父Phaser。

Phaser的构造函数涉及到两个参数:父Phaser和初始的parties,因此提供了4个构造函数:

public Phaser();
public Phaser(int parties);
public Phaser(Phaser parent);
public Phaser(Phaser parent, int parties);

因为Phaser的特色在在于动态的parties,因此首先来看动态更新parties是如何实现的。

Phaser提供了两个方法:register和bulkRegister,前者会添加一个需要同步的线程,后者会添加parties个需要同步的线程。

    public int register() {return doRegister(1);}// 增加了参数的检查public int bulkRegister(int parties) {if (parties < 0)throw new IllegalArgumentException();if (parties == 0)return getPhase();return doRegister(parties);}

两个方法都调用了doRegister方法,因此接下来就来看看doRegister方法。

在分析doRegister之前先来说说Phaser的成员变量:state,它存储了Phaser的状态信息:

private volatile long state;
  1. state的最高位是一个标志位,1表示Phaser的线程同步已经结束,0表示线程同步正在进行

  2. state的低32位中,低16位表示没有到达的线程数量,高16位表示Parties值

  3. state的高32位除了最高位之外的其他31位表示的Phaser的phase,可以理解为第多少次同步(从0开始计算)。

介绍完了state,来看方法doRegister:

 private int doRegister(int registrations) {// 把registrations值同时加到parties值和还未达到的线程数量中去long adj = ((long)registrations << PARTIES_SHIFT) | registrations;final Phaser parent = this.parent;int phase;for (;;) {long s = state;int counts = (int)s;int parties = counts >>> PARTIES_SHIFT;int unarrived = counts & UNARRIVED_MASK;// 超过了允许的最大partiesif (registrations > MAX_PARTIES - parties)throw new IllegalStateException(badRegister(s));// 最高位为1,表示Phaser的线程同步已经结束else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)break;// Phaser中的parties不是0else if (counts != EMPTY) {// 如果当前Phaser没有父Phaser,或者如果有父Phaser,// 刷新自己的state值,如果刷新后的state没有变化。// 这里刷新子Phaser的原因在于,会出现父Phaser已经进入下一个phase// 而子Phaser却没有及时进入下一个phase的延迟现象if (parent == null || reconcileState() == s) {// 如果所有线程都到达了,等待Phaser进入下一次同步开始if (unarrived == 0)root.internalAwaitAdvance(phase, null);// 更新state成功,跳出循环完成注册else if (UNSAFE.compareAndSwapLong(this, stateOffset,s, s + adj))break;}}// 第一次注册,且不是子Phaserelse if (parent == null) {// 更新当前Phaser的state值成功则完成注册long next = ((long)phase << PHASE_SHIFT) | adj;if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))break;}// 第一次注册到子Phaserelse {// 锁定当前Phaser对象synchronized (this) {// 再次检查state值,确保没有被更新if (state == s) {// 注册到父Phaser中去parent.doRegister(1);do { // 获取当前phase值phase = (int)(root.state >>> PHASE_SHIFT);} while (!UNSAFE.compareAndSwapLong(this, stateOffset, state,((long)phase << PHASE_SHIFT) | adj));// 更新当前Phaser的state值break;}}}}return phase;}

看完了注册,那么来看同步操作的arrive,这里也涉及到两个方法:arrive和arriveAndDeregister,前者会等待其他线程的到达,后者则会立刻返回:

  public int arrive() {return doArrive(false);}public int arriveAndDeregister() {return doArrive(true);}

两个方法都调用了doArrive方法,区别在于参数一个是false,一个是true。那么来看doArrive:

  private int doArrive(boolean deregister) {// arrive需要把未到达的线程数减去1,// deregister为true,需要把parties值也减去1int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;final Phaser root = this.root;for (;;) {// 如果是有父Phaser,首先刷新自己的statelong s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);int counts = (int)s;int unarrived = (counts & UNARRIVED_MASK) - 1;// 最高位为1,表示同步已经结束,返回phase值if (phase < 0)return phase;// 如果parties为0或者在此次arrive之前所有线程到达else if (counts == EMPTY || unarrived < 0) {// 对于非子Phaser来说,上述情况的arrive肯定是非法的// 对于子Phaser首先刷新一下状态再做检查if (root == this || reconcileState() == s)throw new IllegalStateException(badArrive(s));}// 正常情况下,首先更新stateelse if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {// 所有线程都已经到达if (unarrived == 0) {// 计算parties作为下一个phase的未到达的partieslong n = s & PARTIES_MASK;int nextUnarrived = (int)n >>> PARTIES_SHIFT;// 调用父Phaser的doArriveif (root != this)// 如果下一个phase的未到达的parties为0,则需要向// 父Phaser取消注册return parent.doArrive(nextUnarrived == 0);// 正在进入下一个Phase,默认的实现是nextUnarrived为0// 表示正在进入下一个Phase,因为下一个phase的parties// 为0,需要等待parties不为0if (onAdvance(phase, nextUnarrived))// 正在等待下一个phase,设置状态为终止n |= TERMINATION_BIT;else if (nextUnarrived == 0)// 下一个phase的parties为0,更新未到达的parties的值n |= EMPTY;else// 更新下一个phase的未到达的parties的值n |= nextUnarrived;// phase值加1n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;// 更新state值UNSAFE.compareAndSwapLong(this, stateOffset, s, n);// 唤醒等待的线程releaseWaiters(phase);}return phase;}}}

关于arrive还有一个方法:arriveAndAwaitAdvance。这个方法会等到下一个phase开始再返回,相等于doArrive方法添加了awaitAdvance方法的功能。基本逻辑和上面说的doArrive方法类似:

  public int arriveAndAwaitAdvance() {final Phaser root = this.root;for (;;) {long s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);int counts = (int)s;int unarrived = (counts & UNARRIVED_MASK) - 1;if (phase < 0)return phase;else if (counts == EMPTY || unarrived < 0) {// 对于非子Phaser来说,因为可以等待下一个phase,// 所以不是非法arriveif (reconcileState() == s)throw new IllegalStateException(badArrive(s));}else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,s -= ONE_ARRIVAL)) {// 还有其他线程没有达到,就会等待直到下一个phase开始if (unarrived != 0)return root.internalAwaitAdvance(phase, null);if (root != this)return parent.arriveAndAwaitAdvance();long n = s & PARTIES_MASK;  // base of next stateint nextUnarrived = (int)n >>> PARTIES_SHIFT;if (onAdvance(phase, nextUnarrived))n |= TERMINATION_BIT;else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;int nextPhase = (phase + 1) & MAX_PHASE;n |= (long)nextPhase << PHASE_SHIFT;if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))return (int)(state >>> PHASE_SHIFT);releaseWaiters(phase);return nextPhase;}}}

所谓线程等待Phaser的当前phase结束并转到下一个phase的过程。Phaser提供了三个方法:

// 不可中断,没有超时的版本
public int awaitAdvance(int phase);// 可以中断,没有超时的版本
public int awaitAdvanceInterruptibly(int phase);// 可以中断,带有超时的版本
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit);

这三个版本的方法的实现大体类似,区别在于第二个版本多了中断异常,第三个版本多了中断异常和超时异常。

   public int awaitAdvance(int phase) {// 获取当前statefinal Phaser root = this.root;long s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);// 检查给定的phase是否和当前的phase一直if (phase < 0)return phase;if (p == phase)return root.internalAwaitAdvance(phase, null);return p;}// 多了一个对于中断的检查然后抛出中断异常public int awaitAdvanceInterruptibly(int phase)throws InterruptedException {final Phaser root = this.root;long s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;if (p == phase) {// 使用QNode实现中断和超时,这里不带超时QNode node = new QNode(this, phase, true, false, 0L);p = root.internalAwaitAdvance(phase, node);// 对于中断的情况,抛出中断异常if (node.wasInterrupted)throw new InterruptedException();}return p;}// 多了中断异常和超时异常public int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit)throws InterruptedException, TimeoutException {long nanos = unit.toNanos(timeout);final Phaser root = this.root;long s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;if (p == phase) {QNode node = new QNode(this, phase, true, true, nanos);p = root.internalAwaitAdvance(phase, node);// 中断异常if (node.wasInterrupted)throw new InterruptedException();// 没有进入下一个phase,抛出超时异常else if (p == phase)throw new TimeoutException();}return p;}

上述三个方法都是调用了internalAwaitAdvance方法来实现等待,因此来看internalAwaitAdvance方法:

  private int internalAwaitAdvance(int phase, QNode node) {// 释放上一个phase的资源releaseWaiters(phase-1);// node是否被加入到队列中boolean queued = false;// 记录前一个Unarrived,用来增加spin值int lastUnarrived = 0;int spins = SPINS_PER_ARRIVAL;long s;int p;// 循环操作直到phase值发生了变化while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {// 不可中断的模式,使用自旋等待if (node == null) {int unarrived = (int)s & UNARRIVED_MASK;if (unarrived != lastUnarrived &&(lastUnarrived = unarrived) < NCPU)spins += SPINS_PER_ARRIVAL;boolean interrupted = Thread.interrupted();// 发生了中断时,使用一个node来记录这个中断if (interrupted || --spins < 0) {node = new QNode(this, phase, false, false, 0L);node.wasInterrupted = interrupted;}}// 当前线程的node可以结束等待了,后面会分析isReleasible方法else if (node.isReleasable())break;// 把node加入到队列中else if (!queued) {// 根据phase值不同,使用不同的队列AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;QNode q = node.next = head.get();// 检查队列的phase是否和要求的phase一致并且Phaser的phase没有发生变化// 符合这两个条件才把node添加到队列中去if ((q == null || q.phase == phase) &&(int)(state >>> PHASE_SHIFT) == phase)queued = head.compareAndSet(q, node);}// node加入队列后直接等待else {try {// 对于普通线程来说,这个方法作用就是循环直到isReleasable返回true// 或者block方法返回trueForkJoinPool.managedBlock(node);} catch (InterruptedException ie) {node.wasInterrupted = true;}}}// 对于进入队列的node,重置一些属性if (node != null) {// 释放thread,不要再使用unparkif (node.thread != null)node.thread = null;// 对于不可中断模式下发生的中断,清除中断状态if (node.wasInterrupted && !node.interruptible)Thread.currentThread().interrupt();// phase依旧没有变化表明同步过程被终止了if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)return abortWait(phase);}// 通知所有的等待线程releaseWaiters(phase);return p;}

下面来看QNode,它实现了ManagedBlocker接口(见ForkJoinPool),ManagedBlocker包含两个方法:isReleasable和block。

isReleasable表示等待可以结束了,下面是QNode实现的isReleasable:

     public boolean isReleasable() {// 没了等待线程,通常会在外部使用"node.thread = null"来释放等待线程,这时可以结束等待if (thread == null)return true;// phase发生变化,可以结束等待if (phaser.getPhase() != phase) {thread = null;return true;}// 可中断的情况下发生线程中断,可以结束等待if (Thread.interrupted())wasInterrupted = true;if (wasInterrupted && interruptible) {thread = null;return true;}// 设置超时的情况下,发生超时,可以结束等待if (timed) {if (nanos > 0L) {long now = System.nanoTime();nanos -= now - lastTime;lastTime = now;}if (nanos <= 0L) {thread = null;return true;}}return false;}

最后来看QNode实现的block方法,核心思想是用LockSupport来实现线程等待:

       public boolean block() {if (isReleasable())return true;// 没有设置超时的情况else if (!timed)LockSupport.park(this);// 设置超时的情况else if (nanos > 0)LockSupport.parkNanos(this, nanos);return isReleasable();}

最后来看releaseWaiters方法,看看怎么释放node队列:

    private void releaseWaiters(int phase) {QNode q;Thread t;AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;// 如果phase已经发生了变化,才能释放while ((q = head.get()) != null &&q.phase != (int)(root.state >>> PHASE_SHIFT)) {// 释放节点并转到下一个节点if (head.compareAndSet(q, q.next) &&(t = q.thread) != null) {// 释放线程q.thread = null;// 通知线程结束等待LockSupport.unpark(t);}}}

这篇关于java.util.concurrent 包源码分析之线程同步辅助的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/958843

相关文章

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.