本文主要是介绍Java并发学习(十八)-并发工具Exchanger,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
断断续续看了一个多礼拜,Exchanger总算是看明白了,思想不难,但是不理解思想去看代码就比较难了。
下面慢慢学习。
What is Exchanger
关于Exchanger,你可以把他看做一个中介,或者信使,它可以让两个运行的线程相互交换东西(Object),并且是带阻塞性质的。
打个比方,两个线程A,B两个要交换东西oa和ob,它们都在运行,使用exchanger这个中介,因为线程调度,并不知道那个线程先去到exchanger,这里假设为A。当A到了后,发现B还没来是吧,那它就要等待(park),当B来了后,发现B在exchanger那儿等它,他就和B交换oa和ob,并唤醒,然后它们两个线程就愉快的自己运行了。
当然上面只是并发量小的情况,如果一旦并发量大,则会使用多个中介(arena数组)来进行。
先给几个例子看看到底是怎么中介的。
例子
下面给出两个比较典型的例子讲解下具体意思:
例子1
public class ExchangerTest2 {private static volatile boolean isDone = false;static class ExchangerProducer implements Runnable {private Exchanger<Integer> exchanger;private static int data = 1;ExchangerProducer(Exchanger<Integer> exchanger) {this.exchanger = exchanger;}public void run() {try {data = 1;System.out.println("producer before: " + data);data = exchanger.exchange(data);System.out.println("producer after: " + data);} catch (InterruptedException e) {e.printStackTrace();}}}static class ExchangerConsumer implements Runnable {private Exchanger<Integer> exchanger;private static int data = 0;ExchangerConsumer(Exchanger<Integer> exchanger) {this.exchanger = exchanger;}public void run() {data = 0;System.out.println("consumer before : " + data);try {data = exchanger.exchange(data);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("consumer after : " + data);}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();Exchanger<Integer> exchanger = new Exchanger<Integer>();new Thread(new ExchangerConsumer(exchanger)).start();new Thread(new ExchangerProducer(exchanger)).start();}
}
输出为:
只有两个线程,并且两个线程只交换一次数据。consummer,然后consummer等待,producer提取。
例子2
public class ExchangerTest {static class Producer implements Runnable {private String buffer;private Exchanger<String> exchanger;Producer(String buffer, Exchanger<String> exchanger) {this.buffer = buffer;this.exchanger = exchanger;}public void run() {for (int i = 1; i < 5; i++) {try {System.out.println("生产者第" + i + "次生产");exchanger.exchange(buffer);} catch (InterruptedException e) {e.printStackTrace();}}}}static class Consumer implements Runnable {private String buffer;private final Exchanger<String> exchanger;public Consumer(String buffer, Exchanger<String> exchanger) {this.buffer = buffer;this.exchanger = exchanger;}public void run() {for (int i = 1; i < 5; i++) {// 调用exchange()与消费者进行数据交换try {buffer = exchanger.exchange(buffer);System.out.println("消费者第" + i + "次消费");} catch (InterruptedException e) {e.printStackTrace();}}}}public static void main(String[] args) throws Exception {String buffer1 = new String();String buffer2 = new String();Exchanger<String> exchanger = new Exchanger<String>();Thread producerThread = new Thread(new Producer(buffer1, exchanger));Thread consumerThread = new Thread(new Consumer(buffer2, exchanger));producerThread.start();consumerThread.start();}
}
输出:
对比第一个的输出,第二个例子的输出有点迷性,咋一看,生产者怎么不等消费者消费,就擅自第二次生产了?
其实没有,生产者还是在等待消费者的,只是由于cpu调度,消费者获取数据后,还没来得及消费,就又呗生产者抢到cpu时间,去进行第二次生产了。
现在估计有点感觉了吧
下面结合远吗具体分析。
形象的例子
学习代码时候,发现有大佬举过一个很形象的例子,可以帮助理解,这里就引用贴出来:
可以理解为多人之间,交换多个东西过程:
- 我先到一个叫做Slot的交易场所交易,发现你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我就进入第5步。
- 我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
- 我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
- 你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
- 我跑去喊管理员,尼玛,就一个坑交易个毛啊,然后管理在一个更加开阔的地方开辟了好多个单间,然后我就挨个来看每个单间是否有人。如果有人我就问他是否可以交易,如果回应了我,那我就进入第2步。如果我没有人,那我就占着这个单间等其他人来交易,进入第4步。
- 如果我尝试了几次都没有成功,我就会认为,是不是我TM选的这个单间风水不好?不行,得换个地儿继续(从头开始);如果我尝试了多次发现还没有成功,怒了,把管理员喊来:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!
Exchanger实现原理分析
先来讲讲Exchanger里面一些重要属性。
首先里面的主要方法就只有两个:
- public V exchange(V x) throws InterruptedException
- public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
即一个普通等待,一个超时时间控制的等待。
代码结构
其次,既然有线程等待,那么必然有数据结构,一个自定义的Node节点:
@sun.misc.Contended static final class Node {/*** node在arena数组里面的索引*/int index; // Arena index 索引int bound; // Last recorded value of Exchanger.bound 最后的exchanger的记录值。int collides; // Number of CAS failures at current bound 如果CAS失败,就冲突自增int hash; // Pseudo-random for spins 伪随机数的自旋,用于设定自旋次数。/*** 自己的资源*/Object item; // This thread's current item 线程的当前对象/*** 对方的资源*/volatile Object match; // Item provided by releasing thread 被释放线程提供的对象volatile Thread parked; // Set to this thread when parked, else null 当park时候,就把当前线程设置进去,否则为null。 }
前面讲过是交换数据嘛,所以需要一个Object来保存自己的资源,一个来保存自己获取的别人的资源。其他的字段可以看后面分析。
关于Contended
是防止伪共享的作用,具体可以看我这一片里面介绍:Java并发学习(十一)-LongAdder和LongAccumulator探究 。
还有一个值得注意的是里面有一个ThreadLocal变量:
/** * ThreadLocal对象,里面放Node。* */static final class Participant extends ThreadLocal<Node> {public Node initialValue() { return new Node(); }}/*** 线程状态。*/private final Participant participant;
所以每个线程虽然都使用Exchangeer,但是他们的participant并不相同,有各自的变量,这里关于ThreadLocal不多说。
接下来就是用于交换数据的slot和arena:
/*** 当并发量大的时候,即多个线程用这一个Exchanger的时候*/private volatile Node[] arena;/*** 开始用slot,并发量小的时候,直到冲突了就更改。*/private volatile Node slot;
exchange方法
下面主要分析Exchanger方法:
@SuppressWarnings("unchecked")public V exchange(V x) throws InterruptedException {Object v;Object item = (x == null) ? NULL_ITEM : x; // translate null args 判断x是否为null。//下面代码逻辑就是,当arena为null,就先尝试执行slotExchange方法,否则就执行arenaExchangerif ((arena != null ||(v = slotExchange(item, false, 0L)) == null) &&((Thread.interrupted() || // disambiguates null return(v = arenaExchange(item, false, 0L)) == null)))throw new InterruptedException();return (v == NULL_ITEM) ? null : (V)v;}
里面具体核心执行两个方法slotExchange(竞争不大)和arenaExchanger(竞争较大)这里主要分析这两个方法。
/*** 当没有冲突不高的时候,也就是只有slot来交换数据的时候。*/private final Object slotExchange(Object item, boolean timed, long ns) {Node p = participant.get(); //获取当前线程私有的nodeThread t = Thread.currentThread(); //当前线程if (t.isInterrupted()) // 如果已经中断了。return null;for (Node q;;) {if ((q = slot) != null) { //slot不为null时候,有人已经占了坑if (U.compareAndSwapObject(this, SLOT, q, null)) { //null去替换q。也就是把这个slot置空,因为我来找你交换了啊,所以不用站这里了Object v = q.item; //记录相关slot里面线程所持有的数据。q.match = item; //我把你的也获取到。Thread w = q.parked; //交换完东西,唤醒你。if (w != null)U.unpark(w);return v;}//如果走到这一步,就说明CAS失败了,判断是否需要用arena数组来支持。if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) //用SEQ去替换0arena = new Node[(FULL + 2) << ASHIFT]; //初始化arena数组}else if (arena != null)return null; // caller must reroute to arenaExchange //slot为null,但是arena不为空,那么就退出去执行arenaExchange方法。else {//slot为null,arena也为null,那么就说明现在没有线程到,当前线程是第一个到的,所以把p也就是threadLocal里面东西存到slot里面。p.item = item;if (U.compareAndSwapObject(this, SLOT, null, p))break;p.item = null;}}// await release 等待去释放。int h = p.hash;long end = timed ? System.nanoTime() + ns : 0L; //如果设定有超时获取时间。int spins = (NCPU > 1) ? SPINS : 1; //设定自旋,如果是单核则次数为1Object v;while ((v = p.match) == null) {//p为当前线程的node,v即对方的资源为null,所以没有来,我就自旋等会。if (spins > 0) {//选择一个自旋次数h ^= h << 1; h ^= h >>> 3; h ^= h << 10;if (h == 0)h = SPINS | (int)t.getId();else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)//休息一会Thread.yield();}else if (slot != p)//这个slot不是自己,被别人抢走了。spins = SPINS;else if (!t.isInterrupted() && arena == null &&(!timed || (ns = end - System.nanoTime()) > 0L)) {//没有中断,且没有超时,那么你就park吧。//park过程。U.putObject(t, BLOCKER, this);p.parked = t;if (slot == p)U.park(false, ns);p.parked = null;U.putObject(t, BLOCKER, null);}else if (U.compareAndSwapObject(this, SLOT, p, null)) { //成功把slot置空,那么就跳出循环,此时要么返回超时,要么返回空。v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;break;}}//CAS防止重排序法,把match设为null,因为什么也没拿到,拿到不会走着条路。U.putOrderedObject(p, MATCH, null);p.item = null;p.hash = h;return v;}
方法的核心就是竞争这个slot,如果slot里面有node了,那么就尝试跟它交换;如果没有东西,那么就尝试自己占领那个节点等待,直到有线程来跟我交换并唤醒。
arenaExchange方法
接下来看arenaExchange方法:
/*** 当是启用了arenas的时候,的更换方法。保存above。* 也就是并发大时候,把slot换为数组操作。*/private final Object arenaExchange(Object item, boolean timed, long ns) {Node[] a = arena; //本地获取arenaNode p = participant.get(); //获取当前线程的node节点。for (int i = p.index;;) { // 获得p在arena的索引int b, m, c; long j; //j是偏移量Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); //CAS方式从数组a里面获取qif (q != null && U.compareAndSwapObject(a, j, q, null)) { //q不为null,就去跟它交换,并且置nullObject v = q.item; // 获取它的itemq.match = item; //把自己的item给他Thread w = q.parked; //获取w并且唤醒它。if (w != null)U.unpark(w);return v;}else if (i <= (m = (b = bound) & MMASK) && q == null) {//q为null,就说明这个位置没人,我就占这儿。p.item = item; // 自己要等待嘛,所以把自己的node节点的item,放入传入的itemif (U.compareAndSwapObject(a, j, null, p)) { //CAS方式,把p更换null。即尝试去占坑long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; //如果有,获取end时间Thread t = Thread.currentThread(); // wait 获取当前线程for (int h = p.hash, spins = SPINS;;) { //自旋操作Object v = p.match;if (v != null) { //p的match不为null,说明自旋时候找到了配对的对方。需要做的就是把东西带走,坑置空,腾出位置U.putOrderedObject(p, MATCH, null); //清空一些信息p.item = null; // clear for next usep.hash = h;return v;}else if (spins > 0) {//伪随机发,有经验的去将当前线程挂起,设定自旋h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshiftif (h == 0) // initialize hashh = SPINS | (int)t.getId();else if (h < 0 && // approx 50% true(--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield(); // 睡眠一会}else if (U.getObjectVolatile(a, j) != p)spins = SPINS; // 如果不是自己,则继续自旋。else if (!t.isInterrupted() && m == 0 &&(!timed ||(ns = end - System.nanoTime()) > 0L)) {//等了多次没等到,到时间了,那就挂起。免得浪费资源U.putObject(t, BLOCKER, this); // emulate LockSupportp.parked = t; // minimize windowif (U.getObjectVolatile(a, j) == p)U.park(false, ns);p.parked = null;U.putObject(t, BLOCKER, null);}else if (U.getObjectVolatile(a, j) == p &&U.compareAndSwapObject(a, j, p, null)) {//当前位置j仍然是p,并且成功把p换为了null。也就是放弃,并重新找个位置开始if (m != 0) // try to shrinkU.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);p.item = null;p.hash = h;i = p.index >>>= 1; // 减半,if (Thread.interrupted())return null;if (timed && m == 0 && ns <= 0L) //超时返回空return TIMED_OUT;break; // expired; restart 重新开始}}}elsep.item = null; // 没有占坑成功,那么就不换。}else {//需要的这个index,有人if (p.bound != b) { // stale; reset 重置p.bound = b;p.collides = 0;i = (i != m || m == 0) ? m : m - 1;}else if ((c = p.collides) < m || m == FULL ||!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {//CAS失败,增加冲突值。p.collides = c + 1;i = (i == 0) ? m : i - 1; // cyclically traverse}elsei = m + 1; // growp.index = i;}}}
arenaExchange方法核心则相对slotExchanger复杂些,因为有了竞争,导致会CAS失败,所以这个时候要多准备几个slot就是arena数组。整个过程怎么理解呢?
可以理解为多个人之间换多个东西。所以要准备arena数组,否则众多线程等待,那会很影响性能的。
相互学习~
参考资料:
1. http://blog.csdn.net/chenssy/article/details/72550933
2. http://brokendreams.iteye.com/blog/2253956
这篇关于Java并发学习(十八)-并发工具Exchanger的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!