Java并发学习(十八)-并发工具Exchanger

2023-10-07 16:10

本文主要是介绍Java并发学习(十八)-并发工具Exchanger,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

断断续续看了一个多礼拜,Exchanger总算是看明白了,思想不难,但是不理解思想去看代码就比较难了。
下面慢慢学习。

What is Exchanger

关于Exchanger,你可以把他看做一个中介,或者信使,它可以让两个运行的线程相互交换东西(Object),并且是带阻塞性质的。
打个比方,两个线程A,B两个要交换东西oa和ob,它们都在运行,使用exchanger这个中介,因为线程调度,并不知道那个线程先去到exchanger,这里假设为A。当A到了后,发现B还没来是吧,那它就要等待(park),当B来了后,发现B在exchanger那儿等它,他就和B交换oa和ob,并唤醒,然后它们两个线程就愉快的自己运行了。

当然上面只是并发量小的情况,如果一旦并发量大,则会使用多个中介(arena数组)来进行。

先给几个例子看看到底是怎么中介的。

例子

下面给出两个比较典型的例子讲解下具体意思:

例子1
public class ExchangerTest2 {private static volatile boolean isDone = false;static class ExchangerProducer implements Runnable {private Exchanger<Integer> exchanger;private static int data = 1;ExchangerProducer(Exchanger<Integer> exchanger) {this.exchanger = exchanger;}public void run() {try {data = 1;System.out.println("producer before: " + data);data = exchanger.exchange(data);System.out.println("producer after: " + data);} catch (InterruptedException e) {e.printStackTrace();}}}static class ExchangerConsumer implements Runnable {private Exchanger<Integer> exchanger;private static int data = 0;ExchangerConsumer(Exchanger<Integer> exchanger) {this.exchanger = exchanger;}public void run() {data = 0;System.out.println("consumer before : " + data);try {data = exchanger.exchange(data);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("consumer after : " + data);}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();Exchanger<Integer> exchanger = new Exchanger<Integer>();new Thread(new ExchangerConsumer(exchanger)).start();new Thread(new ExchangerProducer(exchanger)).start();}
}

输出为:
这里写图片描述

只有两个线程,并且两个线程只交换一次数据。consummer,然后consummer等待,producer提取。

例子2
public class ExchangerTest {static class Producer implements Runnable {private String buffer;private Exchanger<String> exchanger;Producer(String buffer, Exchanger<String> exchanger) {this.buffer = buffer;this.exchanger = exchanger;}public void run() {for (int i = 1; i < 5; i++) {try {System.out.println("生产者第" + i + "次生产");exchanger.exchange(buffer);} catch (InterruptedException e) {e.printStackTrace();}}}}static class Consumer implements Runnable {private String buffer;private final Exchanger<String> exchanger;public Consumer(String buffer, Exchanger<String> exchanger) {this.buffer = buffer;this.exchanger = exchanger;}public void run() {for (int i = 1; i < 5; i++) {// 调用exchange()与消费者进行数据交换try {buffer = exchanger.exchange(buffer);System.out.println("消费者第" + i + "次消费");} catch (InterruptedException e) {e.printStackTrace();}}}}public static void main(String[] args) throws Exception {String buffer1 = new String();String buffer2 = new String();Exchanger<String> exchanger = new Exchanger<String>();Thread producerThread = new Thread(new Producer(buffer1, exchanger));Thread consumerThread = new Thread(new Consumer(buffer2, exchanger));producerThread.start();consumerThread.start();}
}

输出:
这里写图片描述

对比第一个的输出,第二个例子的输出有点迷性,咋一看,生产者怎么不等消费者消费,就擅自第二次生产了?
其实没有,生产者还是在等待消费者的,只是由于cpu调度,消费者获取数据后,还没来得及消费,就又呗生产者抢到cpu时间,去进行第二次生产了。

现在估计有点感觉了吧

下面结合远吗具体分析。

形象的例子

学习代码时候,发现有大佬举过一个很形象的例子,可以帮助理解,这里就引用贴出来:

可以理解为多人之间,交换多个东西过程:

  1. 我先到一个叫做Slot的交易场所交易,发现你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我就进入第5步。
  2. 我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
  3. 我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
  4. 你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
  5. 我跑去喊管理员,尼玛,就一个坑交易个毛啊,然后管理在一个更加开阔的地方开辟了好多个单间,然后我就挨个来看每个单间是否有人。如果有人我就问他是否可以交易,如果回应了我,那我就进入第2步。如果我没有人,那我就占着这个单间等其他人来交易,进入第4步。
  6. 如果我尝试了几次都没有成功,我就会认为,是不是我TM选的这个单间风水不好?不行,得换个地儿继续(从头开始);如果我尝试了多次发现还没有成功,怒了,把管理员喊来:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!

Exchanger实现原理分析

先来讲讲Exchanger里面一些重要属性。
首先里面的主要方法就只有两个:

  • public V exchange(V x) throws InterruptedException
  • public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

即一个普通等待,一个超时时间控制的等待。

代码结构

其次,既然有线程等待,那么必然有数据结构,一个自定义的Node节点:

    @sun.misc.Contended static final class Node {/*** node在arena数组里面的索引*/int index;              // Arena index 索引int bound;              // Last recorded value of Exchanger.bound 最后的exchanger的记录值。int collides;           // Number of CAS failures at current bound  如果CAS失败,就冲突自增int hash;               // Pseudo-random for spins  伪随机数的自旋,用于设定自旋次数。/*** 自己的资源*/Object item;            // This thread's current item  线程的当前对象/*** 对方的资源*/volatile Object match;  // Item provided by releasing thread  被释放线程提供的对象volatile Thread parked; // Set to this thread when parked, else null 当park时候,就把当前线程设置进去,否则为null。  }

前面讲过是交换数据嘛,所以需要一个Object来保存自己的资源,一个来保存自己获取的别人的资源。其他的字段可以看后面分析。

关于Contended 是防止伪共享的作用,具体可以看我这一片里面介绍:Java并发学习(十一)-LongAdder和LongAccumulator探究 。

还有一个值得注意的是里面有一个ThreadLocal变量:

/** * ThreadLocal对象,里面放Node。* */static final class Participant extends ThreadLocal<Node> {public Node initialValue() { return new Node(); }}/*** 线程状态。*/private final Participant participant;

所以每个线程虽然都使用Exchangeer,但是他们的participant并不相同,有各自的变量,这里关于ThreadLocal不多说。

接下来就是用于交换数据的slot和arena:

    /*** 当并发量大的时候,即多个线程用这一个Exchanger的时候*/private volatile Node[] arena;/*** 开始用slot,并发量小的时候,直到冲突了就更改。*/private volatile Node slot;
exchange方法

下面主要分析Exchanger方法:

    @SuppressWarnings("unchecked")public V exchange(V x) throws InterruptedException {Object v;Object item = (x == null) ? NULL_ITEM : x; // translate null args 判断x是否为null。//下面代码逻辑就是,当arena为null,就先尝试执行slotExchange方法,否则就执行arenaExchangerif ((arena != null ||(v = slotExchange(item, false, 0L)) == null) &&((Thread.interrupted() || // disambiguates null return(v = arenaExchange(item, false, 0L)) == null)))throw new InterruptedException();return (v == NULL_ITEM) ? null : (V)v;}

里面具体核心执行两个方法slotExchange(竞争不大)和arenaExchanger(竞争较大)这里主要分析这两个方法。

    /*** 当没有冲突不高的时候,也就是只有slot来交换数据的时候。*/private final Object slotExchange(Object item, boolean timed, long ns) {Node p = participant.get();   //获取当前线程私有的nodeThread t = Thread.currentThread();   //当前线程if (t.isInterrupted()) //   如果已经中断了。return null;for (Node q;;) {if ((q = slot) != null) {    //slot不为null时候,有人已经占了坑if (U.compareAndSwapObject(this, SLOT, q, null)) {  //null去替换q。也就是把这个slot置空,因为我来找你交换了啊,所以不用站这里了Object v = q.item;   //记录相关slot里面线程所持有的数据。q.match = item;    //我把你的也获取到。Thread w = q.parked;  //交换完东西,唤醒你。if (w != null)U.unpark(w);return v;}//如果走到这一步,就说明CAS失败了,判断是否需要用arena数组来支持。if (NCPU > 1 && bound == 0 &&      U.compareAndSwapInt(this, BOUND, 0, SEQ))     //用SEQ去替换0arena = new Node[(FULL + 2) << ASHIFT];     //初始化arena数组}else if (arena != null)return null; // caller must reroute to arenaExchange   //slot为null,但是arena不为空,那么就退出去执行arenaExchange方法。else {//slot为null,arena也为null,那么就说明现在没有线程到,当前线程是第一个到的,所以把p也就是threadLocal里面东西存到slot里面。p.item = item;if (U.compareAndSwapObject(this, SLOT, null, p))break;p.item = null;}}// await release 等待去释放。int h = p.hash;long end = timed ? System.nanoTime() + ns : 0L;       //如果设定有超时获取时间。int spins = (NCPU > 1) ? SPINS : 1;     //设定自旋,如果是单核则次数为1Object v;while ((v = p.match) == null) {//p为当前线程的node,v即对方的资源为null,所以没有来,我就自旋等会。if (spins > 0) {//选择一个自旋次数h ^= h << 1; h ^= h >>> 3; h ^= h << 10;if (h == 0)h = SPINS | (int)t.getId();else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)//休息一会Thread.yield();}else if (slot != p)//这个slot不是自己,被别人抢走了。spins = SPINS;else if (!t.isInterrupted() && arena == null &&(!timed || (ns = end - System.nanoTime()) > 0L)) {//没有中断,且没有超时,那么你就park吧。//park过程。U.putObject(t, BLOCKER, this);p.parked = t;if (slot == p)U.park(false, ns);p.parked = null;U.putObject(t, BLOCKER, null);}else if (U.compareAndSwapObject(this, SLOT, p, null)) {   //成功把slot置空,那么就跳出循环,此时要么返回超时,要么返回空。v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;break;}}//CAS防止重排序法,把match设为null,因为什么也没拿到,拿到不会走着条路。U.putOrderedObject(p, MATCH, null);p.item = null;p.hash = h;return v;}

方法的核心就是竞争这个slot,如果slot里面有node了,那么就尝试跟它交换;如果没有东西,那么就尝试自己占领那个节点等待,直到有线程来跟我交换并唤醒。

arenaExchange方法

接下来看arenaExchange方法:

    /*** 当是启用了arenas的时候,的更换方法。保存above。* 也就是并发大时候,把slot换为数组操作。*/private final Object arenaExchange(Object item, boolean timed, long ns) {Node[] a = arena;   //本地获取arenaNode p = participant.get();      //获取当前线程的node节点。for (int i = p.index;;) {                      // 获得p在arena的索引int b, m, c; long j;                       //j是偏移量Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);   //CAS方式从数组a里面获取qif (q != null && U.compareAndSwapObject(a, j, q, null)) {         //q不为null,就去跟它交换,并且置nullObject v = q.item;                     // 获取它的itemq.match = item;                   //把自己的item给他Thread w = q.parked;               //获取w并且唤醒它。if (w != null)U.unpark(w);return v;}else if (i <= (m = (b = bound) & MMASK) && q == null) {//q为null,就说明这个位置没人,我就占这儿。p.item = item;                         // 自己要等待嘛,所以把自己的node节点的item,放入传入的itemif (U.compareAndSwapObject(a, j, null, p)) {       //CAS方式,把p更换null。即尝试去占坑long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;   //如果有,获取end时间Thread t = Thread.currentThread(); // wait  获取当前线程for (int h = p.hash, spins = SPINS;;) {  //自旋操作Object v = p.match;if (v != null) {             //p的match不为null,说明自旋时候找到了配对的对方。需要做的就是把东西带走,坑置空,腾出位置U.putOrderedObject(p, MATCH, null); //清空一些信息p.item = null;             // clear for next usep.hash = h;return v;}else if (spins > 0) {//伪随机发,有经验的去将当前线程挂起,设定自旋h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshiftif (h == 0)                // initialize hashh = SPINS | (int)t.getId();else if (h < 0 &&          // approx 50% true(--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();        // 睡眠一会}else if (U.getObjectVolatile(a, j) != p)spins = SPINS;       // 如果不是自己,则继续自旋。else if (!t.isInterrupted() && m == 0 &&(!timed ||(ns = end - System.nanoTime()) > 0L)) {//等了多次没等到,到时间了,那就挂起。免得浪费资源U.putObject(t, BLOCKER, this); // emulate LockSupportp.parked = t;              // minimize windowif (U.getObjectVolatile(a, j) == p)U.park(false, ns);p.parked = null;U.putObject(t, BLOCKER, null);}else if (U.getObjectVolatile(a, j) == p &&U.compareAndSwapObject(a, j, p, null)) {//当前位置j仍然是p,并且成功把p换为了null。也就是放弃,并重新找个位置开始if (m != 0)                // try to shrinkU.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);p.item = null;p.hash = h;i = p.index >>>= 1;        // 减半,if (Thread.interrupted())return null;if (timed && m == 0 && ns <= 0L)  //超时返回空return TIMED_OUT;break;                     // expired; restart 重新开始}}}elsep.item = null;                     // 没有占坑成功,那么就不换。}else {//需要的这个index,有人if (p.bound != b) {                    // stale; reset 重置p.bound = b;p.collides = 0;i = (i != m || m == 0) ? m : m - 1;}else if ((c = p.collides) < m || m == FULL ||!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {//CAS失败,增加冲突值。p.collides = c + 1;i = (i == 0) ? m : i - 1;          // cyclically traverse}elsei = m + 1;                         // growp.index = i;}}}

arenaExchange方法核心则相对slotExchanger复杂些,因为有了竞争,导致会CAS失败,所以这个时候要多准备几个slot就是arena数组。整个过程怎么理解呢?
可以理解为多个人之间换多个东西。所以要准备arena数组,否则众多线程等待,那会很影响性能的。
相互学习~

参考资料:
1. http://blog.csdn.net/chenssy/article/details/72550933
2. http://brokendreams.iteye.com/blog/2253956

这篇关于Java并发学习(十八)-并发工具Exchanger的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/158881

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06