java.util.concurrent 包源码分析之线程同步辅助

2024-05-04 09:08

本文主要是介绍java.util.concurrent 包源码分析之线程同步辅助,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

CyclicBarrier

CyclicBarrier是一个用于线程同步的辅助类,它允许一组线程等待彼此,直到所有线程都到达集合点,然后执行某个设定的任务。现实中有个很好的例子来形容:几个人约定了某个地方集中,然后一起出发去旅行。每个参与的人就是一个线程,CyclicBarrier就是那个集合点,所有人到了之后,就一起出发。

CyclicBarrier的构造函数有两个:

// parties是参与等待的线程的数量,barrierAction是所有线程达到集合点之后要做的动作
public CyclicBarrier(int parties, Runnable barrierAction);// 达到集合点之后不执行操作的构造函数
public CyclicBarrier(int parties)

需要说明的是,CyclicBarrier只是记录线程的数目,CyclicBarrier是不创建任何线程的。线程是通过调用CyclicBarrier的await方法来等待其他线程,如果调用await方法的线程数目达到了预设值,也就是上面构造方法中的parties,CyclicBarrier就会开始执行barrierAction。

因此我们来看CyclicBarrier的核心方法dowait,也就是await方法调用的私有方法:

   private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;lock.lock();try {final Generation g = generation;if (g.broken)throw new BrokenBarrierException();if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// count就是预设的parties,count减1的值表示还剩余几个// 线程没有达到该集合点int index = --count;// index为0表示所有的线程都已经达到集合点,这时// 占用最后一个线程,执行运行设定的任务if (index == 0) {boolean ranAction = false;try {final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 唤醒其他等待的线程,// 更新generation以便下一次运行nextGeneration();return 0;} finally {// 如果运行任务时发生异常,设置状态为broken// 并且唤醒其他等待的线程if (!ranAction)breakBarrier();}}// 还有线程没有调用await,进入循环等待直到其他线程// 达到集合点或者等待超时for (;;) {try {// 如果没有设置超时,进行无超时的等待if (!timed)trip.await();// 有超时设置,进行有超时的等待else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {// generation如果没有被更新表示还是当前的运行// (generation被更新表示集合完毕并且任务成功),// 在状态没有被设置为broken状态的情况下,遇到线程// 中断异常表示当前线程等待失败,需要设置为broken// 状态,并且抛出中断异常if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {// else对应的条件为:g != generation || g.broken// 表示要么generation已经被更新意味着所有线程已经到达// 集合点并且任务执行成功,要么就是是broken状态意味着// 任务执行失败,无论哪种情况所有线程已经达到集合点,当// 前线程要结束等待了,发生了中断异常,需要中断当前线程// 表示遇到了中断异常。Thread.currentThread().interrupt();}}// 如果发现当前状态为broken,抛出异常if (g.broken)throw new BrokenBarrierException();// generation被更新表示所有线程都已经达到集合点// 并且预设任务已经完成,返回该线程进入等待顺序号if (g != generation)return index;// 等待超时,设置为broken状态并且抛出超时异常if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}
  1. 任何一个线程等待时发生异常,CyclicBarrier都将被设置为broken状态,运行都会失败

  2. 每次运行成功之后CyclicBarrier都会清理运行状态,这样CyclicBarrier可以重新使用

  3. 对于设置了超时的等待,在发生超时的时候会引起CyclicBarrier的broken

CountDownLatch

CountDownLatch同样也是一个线程同步的辅助类,同样适用上面的集合点的场景来解释,但是运行模式完全不同。CyclicBarrier是参与的所有的线程彼此等待,CountDownLatch则不同,CountDownLatch有一个导游线程在等待,每个线程报到一下即可无须等待,等到导游线程发现所有人都已经报到了,就结束了自己的等待。

CountDownLatch的构造方法允许指定参与的线程数量:

public CountDownLatch(int count)

参与线程使用countDown表示报到:

   public void countDown() {sync.releaseShared(1);}

看到releaseShared很容易使人联想到共享锁,那么试着用共享锁的运行模式来解释就简单得多了:和信号量的实现类似,CountDownLatch内置一下有限的共享锁。每个参与线程拥有一把共享锁,调用countDown就等于是释放了自己的共享锁,导游线程await等于一下子要拿回所有的共享锁。那么基于AbstractQueuedSynchronizer类来实现就很简单了:

   public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}

在await时注意到数量是1,其实这个参数对于CountDownLatch实现的Sync类(AbstractQueuedSynchronizer的子类)来说是不起作用的,因为需要保证await获取共享锁时必须拿到所有的共享锁,这个参数也就变得没有意义了。看一下Sync的tryAcquireShared方法就明白了:

        protected int tryAcquireShared(int acquires) {// 和信号量Semaphore的实现一样,使用state来存储count,// 每次释放共享锁就把state减1,state为0表示所有的共享// 锁已经被释放。注意:这里的acquires参数不起作用return (getState() == 0) ? 1 : -1;}

因此Sync的tryReleaseShared就是更新state(每次state减1):

 protected boolean tryReleaseShared(int releases) {// 每次state减1,当state为0,返回false表示所有的共享锁都已经释放for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}

CyclicBarrier和CountDownLatch本质上来说都是多个线程同步的辅助工具,前者可以看成分布式的,后者可以看出是主从式。

Phaser

Phaser是JDK7新添加的线程同步辅助类,作用同CyclicBarrier,CountDownLatch类似,但是使用起来更加灵活:

  1. Parties是动态的。

  2. Phaser支持树状结构,即Phaser可以有一个父Phaser。

Phaser的构造函数涉及到两个参数:父Phaser和初始的parties,因此提供了4个构造函数:

public Phaser();
public Phaser(int parties);
public Phaser(Phaser parent);
public Phaser(Phaser parent, int parties);

因为Phaser的特色在在于动态的parties,因此首先来看动态更新parties是如何实现的。

Phaser提供了两个方法:register和bulkRegister,前者会添加一个需要同步的线程,后者会添加parties个需要同步的线程。

    public int register() {return doRegister(1);}// 增加了参数的检查public int bulkRegister(int parties) {if (parties < 0)throw new IllegalArgumentException();if (parties == 0)return getPhase();return doRegister(parties);}

两个方法都调用了doRegister方法,因此接下来就来看看doRegister方法。

在分析doRegister之前先来说说Phaser的成员变量:state,它存储了Phaser的状态信息:

private volatile long state;
  1. state的最高位是一个标志位,1表示Phaser的线程同步已经结束,0表示线程同步正在进行

  2. state的低32位中,低16位表示没有到达的线程数量,高16位表示Parties值

  3. state的高32位除了最高位之外的其他31位表示的Phaser的phase,可以理解为第多少次同步(从0开始计算)。

介绍完了state,来看方法doRegister:

 private int doRegister(int registrations) {// 把registrations值同时加到parties值和还未达到的线程数量中去long adj = ((long)registrations << PARTIES_SHIFT) | registrations;final Phaser parent = this.parent;int phase;for (;;) {long s = state;int counts = (int)s;int parties = counts >>> PARTIES_SHIFT;int unarrived = counts & UNARRIVED_MASK;// 超过了允许的最大partiesif (registrations > MAX_PARTIES - parties)throw new IllegalStateException(badRegister(s));// 最高位为1,表示Phaser的线程同步已经结束else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)break;// Phaser中的parties不是0else if (counts != EMPTY) {// 如果当前Phaser没有父Phaser,或者如果有父Phaser,// 刷新自己的state值,如果刷新后的state没有变化。// 这里刷新子Phaser的原因在于,会出现父Phaser已经进入下一个phase// 而子Phaser却没有及时进入下一个phase的延迟现象if (parent == null || reconcileState() == s) {// 如果所有线程都到达了,等待Phaser进入下一次同步开始if (unarrived == 0)root.internalAwaitAdvance(phase, null);// 更新state成功,跳出循环完成注册else if (UNSAFE.compareAndSwapLong(this, stateOffset,s, s + adj))break;}}// 第一次注册,且不是子Phaserelse if (parent == null) {// 更新当前Phaser的state值成功则完成注册long next = ((long)phase << PHASE_SHIFT) | adj;if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))break;}// 第一次注册到子Phaserelse {// 锁定当前Phaser对象synchronized (this) {// 再次检查state值,确保没有被更新if (state == s) {// 注册到父Phaser中去parent.doRegister(1);do { // 获取当前phase值phase = (int)(root.state >>> PHASE_SHIFT);} while (!UNSAFE.compareAndSwapLong(this, stateOffset, state,((long)phase << PHASE_SHIFT) | adj));// 更新当前Phaser的state值break;}}}}return phase;}

看完了注册,那么来看同步操作的arrive,这里也涉及到两个方法:arrive和arriveAndDeregister,前者会等待其他线程的到达,后者则会立刻返回:

  public int arrive() {return doArrive(false);}public int arriveAndDeregister() {return doArrive(true);}

两个方法都调用了doArrive方法,区别在于参数一个是false,一个是true。那么来看doArrive:

  private int doArrive(boolean deregister) {// arrive需要把未到达的线程数减去1,// deregister为true,需要把parties值也减去1int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;final Phaser root = this.root;for (;;) {// 如果是有父Phaser,首先刷新自己的statelong s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);int counts = (int)s;int unarrived = (counts & UNARRIVED_MASK) - 1;// 最高位为1,表示同步已经结束,返回phase值if (phase < 0)return phase;// 如果parties为0或者在此次arrive之前所有线程到达else if (counts == EMPTY || unarrived < 0) {// 对于非子Phaser来说,上述情况的arrive肯定是非法的// 对于子Phaser首先刷新一下状态再做检查if (root == this || reconcileState() == s)throw new IllegalStateException(badArrive(s));}// 正常情况下,首先更新stateelse if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {// 所有线程都已经到达if (unarrived == 0) {// 计算parties作为下一个phase的未到达的partieslong n = s & PARTIES_MASK;int nextUnarrived = (int)n >>> PARTIES_SHIFT;// 调用父Phaser的doArriveif (root != this)// 如果下一个phase的未到达的parties为0,则需要向// 父Phaser取消注册return parent.doArrive(nextUnarrived == 0);// 正在进入下一个Phase,默认的实现是nextUnarrived为0// 表示正在进入下一个Phase,因为下一个phase的parties// 为0,需要等待parties不为0if (onAdvance(phase, nextUnarrived))// 正在等待下一个phase,设置状态为终止n |= TERMINATION_BIT;else if (nextUnarrived == 0)// 下一个phase的parties为0,更新未到达的parties的值n |= EMPTY;else// 更新下一个phase的未到达的parties的值n |= nextUnarrived;// phase值加1n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;// 更新state值UNSAFE.compareAndSwapLong(this, stateOffset, s, n);// 唤醒等待的线程releaseWaiters(phase);}return phase;}}}

关于arrive还有一个方法:arriveAndAwaitAdvance。这个方法会等到下一个phase开始再返回,相等于doArrive方法添加了awaitAdvance方法的功能。基本逻辑和上面说的doArrive方法类似:

  public int arriveAndAwaitAdvance() {final Phaser root = this.root;for (;;) {long s = (root == this) ? state : reconcileState();int phase = (int)(s >>> PHASE_SHIFT);int counts = (int)s;int unarrived = (counts & UNARRIVED_MASK) - 1;if (phase < 0)return phase;else if (counts == EMPTY || unarrived < 0) {// 对于非子Phaser来说,因为可以等待下一个phase,// 所以不是非法arriveif (reconcileState() == s)throw new IllegalStateException(badArrive(s));}else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,s -= ONE_ARRIVAL)) {// 还有其他线程没有达到,就会等待直到下一个phase开始if (unarrived != 0)return root.internalAwaitAdvance(phase, null);if (root != this)return parent.arriveAndAwaitAdvance();long n = s & PARTIES_MASK;  // base of next stateint nextUnarrived = (int)n >>> PARTIES_SHIFT;if (onAdvance(phase, nextUnarrived))n |= TERMINATION_BIT;else if (nextUnarrived == 0)n |= EMPTY;elsen |= nextUnarrived;int nextPhase = (phase + 1) & MAX_PHASE;n |= (long)nextPhase << PHASE_SHIFT;if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))return (int)(state >>> PHASE_SHIFT);releaseWaiters(phase);return nextPhase;}}}

所谓线程等待Phaser的当前phase结束并转到下一个phase的过程。Phaser提供了三个方法:

// 不可中断,没有超时的版本
public int awaitAdvance(int phase);// 可以中断,没有超时的版本
public int awaitAdvanceInterruptibly(int phase);// 可以中断,带有超时的版本
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit);

这三个版本的方法的实现大体类似,区别在于第二个版本多了中断异常,第三个版本多了中断异常和超时异常。

   public int awaitAdvance(int phase) {// 获取当前statefinal Phaser root = this.root;long s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);// 检查给定的phase是否和当前的phase一直if (phase < 0)return phase;if (p == phase)return root.internalAwaitAdvance(phase, null);return p;}// 多了一个对于中断的检查然后抛出中断异常public int awaitAdvanceInterruptibly(int phase)throws InterruptedException {final Phaser root = this.root;long s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;if (p == phase) {// 使用QNode实现中断和超时,这里不带超时QNode node = new QNode(this, phase, true, false, 0L);p = root.internalAwaitAdvance(phase, node);// 对于中断的情况,抛出中断异常if (node.wasInterrupted)throw new InterruptedException();}return p;}// 多了中断异常和超时异常public int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit)throws InterruptedException, TimeoutException {long nanos = unit.toNanos(timeout);final Phaser root = this.root;long s = (root == this) ? state : reconcileState();int p = (int)(s >>> PHASE_SHIFT);if (phase < 0)return phase;if (p == phase) {QNode node = new QNode(this, phase, true, true, nanos);p = root.internalAwaitAdvance(phase, node);// 中断异常if (node.wasInterrupted)throw new InterruptedException();// 没有进入下一个phase,抛出超时异常else if (p == phase)throw new TimeoutException();}return p;}

上述三个方法都是调用了internalAwaitAdvance方法来实现等待,因此来看internalAwaitAdvance方法:

  private int internalAwaitAdvance(int phase, QNode node) {// 释放上一个phase的资源releaseWaiters(phase-1);// node是否被加入到队列中boolean queued = false;// 记录前一个Unarrived,用来增加spin值int lastUnarrived = 0;int spins = SPINS_PER_ARRIVAL;long s;int p;// 循环操作直到phase值发生了变化while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {// 不可中断的模式,使用自旋等待if (node == null) {int unarrived = (int)s & UNARRIVED_MASK;if (unarrived != lastUnarrived &&(lastUnarrived = unarrived) < NCPU)spins += SPINS_PER_ARRIVAL;boolean interrupted = Thread.interrupted();// 发生了中断时,使用一个node来记录这个中断if (interrupted || --spins < 0) {node = new QNode(this, phase, false, false, 0L);node.wasInterrupted = interrupted;}}// 当前线程的node可以结束等待了,后面会分析isReleasible方法else if (node.isReleasable())break;// 把node加入到队列中else if (!queued) {// 根据phase值不同,使用不同的队列AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;QNode q = node.next = head.get();// 检查队列的phase是否和要求的phase一致并且Phaser的phase没有发生变化// 符合这两个条件才把node添加到队列中去if ((q == null || q.phase == phase) &&(int)(state >>> PHASE_SHIFT) == phase)queued = head.compareAndSet(q, node);}// node加入队列后直接等待else {try {// 对于普通线程来说,这个方法作用就是循环直到isReleasable返回true// 或者block方法返回trueForkJoinPool.managedBlock(node);} catch (InterruptedException ie) {node.wasInterrupted = true;}}}// 对于进入队列的node,重置一些属性if (node != null) {// 释放thread,不要再使用unparkif (node.thread != null)node.thread = null;// 对于不可中断模式下发生的中断,清除中断状态if (node.wasInterrupted && !node.interruptible)Thread.currentThread().interrupt();// phase依旧没有变化表明同步过程被终止了if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)return abortWait(phase);}// 通知所有的等待线程releaseWaiters(phase);return p;}

下面来看QNode,它实现了ManagedBlocker接口(见ForkJoinPool),ManagedBlocker包含两个方法:isReleasable和block。

isReleasable表示等待可以结束了,下面是QNode实现的isReleasable:

     public boolean isReleasable() {// 没了等待线程,通常会在外部使用"node.thread = null"来释放等待线程,这时可以结束等待if (thread == null)return true;// phase发生变化,可以结束等待if (phaser.getPhase() != phase) {thread = null;return true;}// 可中断的情况下发生线程中断,可以结束等待if (Thread.interrupted())wasInterrupted = true;if (wasInterrupted && interruptible) {thread = null;return true;}// 设置超时的情况下,发生超时,可以结束等待if (timed) {if (nanos > 0L) {long now = System.nanoTime();nanos -= now - lastTime;lastTime = now;}if (nanos <= 0L) {thread = null;return true;}}return false;}

最后来看QNode实现的block方法,核心思想是用LockSupport来实现线程等待:

       public boolean block() {if (isReleasable())return true;// 没有设置超时的情况else if (!timed)LockSupport.park(this);// 设置超时的情况else if (nanos > 0)LockSupport.parkNanos(this, nanos);return isReleasable();}

最后来看releaseWaiters方法,看看怎么释放node队列:

    private void releaseWaiters(int phase) {QNode q;Thread t;AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;// 如果phase已经发生了变化,才能释放while ((q = head.get()) != null &&q.phase != (int)(root.state >>> PHASE_SHIFT)) {// 释放节点并转到下一个节点if (head.compareAndSet(q, q.next) &&(t = q.thread) != null) {// 释放线程q.thread = null;// 通知线程结束等待LockSupport.unpark(t);}}}

这篇关于java.util.concurrent 包源码分析之线程同步辅助的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/958843

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory