java多线程间通信_Java中的线程间通信以光速传输

2023-10-08 11:10

本文主要是介绍java多线程间通信_Java中的线程间通信以光速传输,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

java多线程间通信

故事以一个简单的想法开始:创建一个开发人员友好,简单且轻量级的线程间通信框架,而无需使用任何锁,同步器,信号量,等待,通知; 并且没有队列,消息,事件或任何其他并发特定的词或工具。

只需让POJO在普通的旧Java接口之间进行通信即可。

它可能类似于Akka类型的actor ,但是由于新框架必须超轻量且针对单个多核计算机上的线程间通信进行了优化,因此可能会过大。

当参与者跨越同一机器上或跨网络分布的机器上的不同JVM实例之间的进程边界时,Akka框架非常适合进程间通信。

但是,在较小的项目中使用Akka类型的actor可能是多余的,在这些项目中,您仅需要线程间通信,但是您仍然要坚持使用typed actor方法。

我使用动态代理,阻塞队列和缓存的线程池在几天之内创建了一个解决方案。

图1显示了所创建框架的高层架构:



图1:框架的高级架构

SPSC队列是“单一生产者/单一消费者”队列。 MPSC队列是多生产者/单消费者。

分派器线程从Actor线程接收消息,并将其发送到适当的SPSC队列中。

参与者线程使用接收到的消息中的数据来调用参与者实例的相应方法。 通过使用其他角色的代理,角色实例将消息发送到MPSC队列,然后消息到达目标角色线程。

为了进行简单的测试,我创建了一个乒乓球示例:

public interface PlayerA (
void pong(long ball); //send and forget method call 
}
public interface PlayerB {   
void ping(PlayerA playerA, long ball); //send and forget method call    
}    
public class PlayerAImpl implements PlayerA {    
@Override    
@ublic void pong(long ball) {    
}    
}
public class PlayerBImpl implements PlayerB {   
@Override    
public void ping(PlayerA playerA, long ball) {    
playerA.pong(ball);    
}    
}
public class PingPongExample {   
public void testPingPong() {
// this manager hides the complexity of inter-thread communications   
// and it takes control over actor proxies, actor implementations and threads    
ActorManager manager = new ActorManager();
// registers actor implementations inside the manager   
manager.registerImpl(PlayerAImpl.class);    
manager.registerImpl(PlayerBImpl.class);
//Create actor proxies. Proxies convert method calls into internal messages    
//which would be sent between threads to a specific actor instance.    
PlayerA playerA = manager.createActor(PlayerA.class);    
PlayerB playerB = manager.createActor(PlayerB.class);    
for(int i = 0; i < 1000000; i++) {    
playerB.ping(playerA, i);     
}    
}

他们的演奏速度约为每秒500,000 ping / pong。 到目前为止,一切都很好。 但是,当与仅使用一个线程的执行速度进行比较时,它突然看起来并不那么好。 在单个线程中运行的代码每秒可以执行超过20亿(2,681,850,373)个操作!

相差超过5,000次。 这让我很失望。 它产生的单线程代码在许多情况下比多线程代码更有效。

我开始寻找导致乒乓球运动员动作缓慢的原因。 经过一些调查和测试,我发现我用来在参与者之间传递消息的阻塞队列正在影响性能。

图2:具有单个生产者和单个消费者的SPSC队列

因此,我开始寻求Java中最快的队列实现之一作为替代。 我找到了Nitsan Wakart一个很棒的博客 。 他有几篇文章描述了单生产者/单消费者(SPSC)无锁队列的一些实现。 这些帖子的灵感来自马丁·汤普森(Martin Thompson)的无锁算法以实现最终性能 。

与基于锁原语的队列相比,无锁队列提供了更好的性能。 对于基于锁的队列,当一个线程获得锁时,其他线程将被阻塞,直到锁释放为止。 在无锁算法的情况下,生产者线程可以生成消息而不会被其他生产者线程阻塞,并且使用者从队列中读取时不会被其他使用者阻塞。

Martin Thompson的演讲和Nitsan的博客中描述的SPSC队列的性能结果令人难以置信- 超过100M ops / sec 。 它比JDK的并发队列实施快10倍以上(在具有4个内核的Intel Core i7上的性能约为8M ops / sec)。

带着极大的期待,我用无锁的SPSC队列实现替换了连接到每个参与者的链接阻塞队列。 可悲的是,性能测试并没有显着提高吞吐量。 很快就意识到瓶颈不是SPSC队列,而是多生产者/单一消费者(MPSC)队列。

在MPSC队列中使用SPSC队列不是一件容易的事; 多个生产者可以通过执行放置操作覆盖彼此的值。 SPSC队列只是没有代码来控制多个生产者的放置操作。 因此,即使最快的SPSC队列也无法解决我的问题。

对于多个生产者/单个消费者,我决定使用LMAX Disruptor –一种基于环形缓冲区的高性能线程间消息传递库。

图3:具有单个生产者和单个消费者的LMAX破坏者

通过使用Disruptor,很容易实现非常低延迟,高吞吐量的线程间消息通信。 它还为生产者和消费者的不同组合提供了用例。 几个线程可以从环形缓冲区读取而不会互相阻塞:

图4:具有单个生产者和两个消费者的LMAX Disruptor

多个生产者向环形缓冲区写入消息,而多个使用者从环形缓冲区获取消息的情况。

图5:具有两个生产者和两个消费者的LMAX Disruptor

快速搜索性能测试后,我发现了针对三个发布者和一个消费者的吞吐量测试 。 那正是医生命令的,并且产生了以下结果:

LinkedBlockingQueue

破坏者

运行0

4,550,625个操作/秒

11,487,650次操作/秒

运行1

4,651,162 ops / sec

11,049,723次操作/秒

运行2

4,404,316 ops / sec

11,142,061 ops / sec

在3个Producers / 1 Consumer案例中,Disruptor的速度是LinkedBlockingQueue的两倍以上。 但是,这与我期望的性能结果提高10倍相比还有很长的路要走。

我对这种顺序感到沮丧,我的头脑正在寻找解决方案。 由于命运的缘故,我最近修改了通勤路线,改用地铁代替旧的拼车。 突然间,一场遐想传遍了我,我的思绪开始将制图站映射到生产者和消费者。 在一个站点上,我们既有生产者(以载人的货车形式)也有消费者(与载人的货车相同)。

我创建了Railway类,并使用AtomicLong跟踪火车从车站到车站的经过。 对于简单的情况,我从单轨铁路开始。

public class RailWay {  
private final Train train = new Train();  
// the stationNo tracks the train and defines which station has the received train
private final AtomicInteger stationIndex = new AtomicInteger();
// Multiple threads access this method and wait for the train on the specific station. 
public Train waitTrainOnStation(final int stationNo) {
while (stationIndex.get() % stationCount != stationNo) {
Thread.yield(); // this is necessary to keep a high throughput of message passing.   
//But it eats CPU cycles while waiting for a train  
}  
// the busy loop returns only when the station number will match  
// stationIndex.get() % stationCount condition
return train;
}
// this method moves this train to the next station by incrementing the train station index…
public void sendTrain() {
stationIndex.getAndIncrement();
}
}

为了进行测试,我使用了Disruptor性能测试和SPSC队列测试中使用的相同条件-测试在线程之间传递长值。 我创建了以下Train类,其中包含一个长数组:

public class Train {   
//   
public static int CAPACITY = 2*1024;
private final long[] goodsArray; // array to transfer freight goods
private int index;
public Train() {   
goodsArray = new long[CAPACITY];     
}
public int goodsCount() { // returns the count of goods    
return index;    
}    
public void addGoods(long i) { // adds item to the train    
goodsArray[index++] = i;    
}    
public long getGoods(int i) { //removes the item from the train    
index--;    
return goodsArray[i];    
}    
}

然后,我编写了一个简单的测试 :两个线程通过火车在彼此之间传输长整型。

图6:具有单一生产者和单一消费者的铁路使用单一火车

public void testRailWay() {   
final Railway railway = new Railway();    
final long n = 20000000000l;    
//starting a consumer thread    
new Thread() {    
long lastValue = 0;
@Override   
public void run() {    
while (lastValue < n) {    
Train train = railway.waitTrainOnStation(1); //waits for the train at the station #1    
int count = train.goodsCount();    
for (int i = 0; i < count; i++) {    
lastValue = train.getGoods(i); // unload goods    
}    
railway.sendTrain(); //sends the current train to the first station.    
}    
}    
}.start();
final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
Train train = railway.waitTrainOnStation(0); // waits for the train on the station #0    
int capacity = train.getCapacity();    
for (int j = 0; j < capacity; j++) {    
train.addGoods((int)i++); // adds goods to the train    
}    
railway.sendTrain();
if (i % 100000000 == 0) { //measures the performance per each 100M items   
final long duration = System.nanoTime() - start;|    
final long ops = (i * 1000L * 1000L * 1000L) / duration;    
System.out.format("ops/sec = %,d\n", ops);    
System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);    
System.out.format("latency nanos = %.3f%n\n", 
duration / (float)(i) * (float) Train.CAPACITY);    
}    
}    
}

通过以不同的火车容量运行测试,结果令我惊讶:

容量

吞吐量:ops / sec

延迟时间:ns

1个

5,190,883

192.6

2

10,282,820

194.5

32

104,878,614

305.1

256

344,614,640

742. 9

2048

608,112,493

3,367.8

32768

767,028,751

42,720.7

在两个线程之间传输消息的吞吐量达到767,028,751 ops / sec,列车容量为32,768长。 它比Nitsan博客中的SPSC队列快几倍。

继续铁路的思路,我考虑了如果我们有两列火车会发生什么? 我认为它应该同时提高吞吐量和减少延迟。 每个车站都有自己的火车 。 一列火车将在第一站装载货物,而第二列火车将在第二站卸载货物,反之亦然。

图7:具有单一生产者和单一消费者的铁路使用两列火车

这是吞吐量的结果:

容量

吞吐量:ops / sec

延迟时间:ns

1个

7,492,684

133.5

2

14,754,786

135.5

32

174,227,656

183.7

256

613,555,475

417.2

2048

940,144,900

2,178.4

32768

797,806,764

41,072.6

结果是惊人的。 它比单列火车的测试结果快1.4倍以上。 对于1的火车容量,等待时间从192.6纳秒减少到133.5纳秒; 显然是一个有希望的迹象。

因此,我的实验还没有结束。 对于2048的火车容量,在线程之间传输消息的等待时间为-2,178.4纳秒,这太多了。 我正在考虑如何减少这种情况,并创建了很多火车的案例:

图8:具有单一生产者和单一消费者的铁路使用许多火车

我还将火车容量减少到一个长值,并开始使用火车数量。 以下是测试结果:

火车数量

吞吐量:ops / sec

延迟时间:ns

2

10,917,951

91.6

32

31,233,310

32.0

256

42,791,962

23.4

1024

53,220,057

18.8

32768

71,812,166

13.9

使用32,768个训练,线程之间发送长值的等待时间减少到13.9纳秒。 当等待时间不是很高并且吞吐量不是那么低时,通过训练火车数量和火车容量,可以将吞吐量和等待时间调整到最佳平衡。

这些数字对于单一生产者和单一消费者(SPSC)而言非常有用。 但是我们如何才能为多个生产者和消费者使用这项功能? 答案很简单-添加更多电台!

图9:具有一个生产者和两个消费者的铁路

每个线程都会等待下一列火车,然后加载/卸载项目,然后将火车发送到下一站。 生产者线程将物品放到火车上,而消费者从火车上获得物品。 火车不断从一个车站到另一个车站绕圈行驶。

为了测试单一生产者/多个消费者(SPMC)案例,我创建了具有8个站点的铁路测试 。 一个站点属于单个生产者,而其他7个站点属于消费者。 结果是:

对于火车数量= 256和火车容量= 32:

ops/sec =116,604,397    
latency nanos = 274.4

对于火车数量= 32和火车容量= 256:

ops/sec =432,055,469    
latency nanos = 592.5

如您所见,即使使用八个工作线程,该测试也显示出不错的结果-432,055,469个操作/秒,包含32个列和256个long的容量。 在测试期间,所有CPU内核均加载到100%。

图10:8个站的铁路测试期间的CPU利用率

在使用Rails算法时,我几乎忘记了我的目标。 以提高“多个生产者/单个消费者”案例的性能。

图11:具有三个生产者和单个消费者的铁路

我创建了一个包含3个生产者和1个消费者的新测试。 每列火车从一个站点到另一个站点跟踪圆,而每个生产者仅承载每列火车容量的1/3。 每趟火车,消费者都会从三个生产者那里获得全部三个物品。 性能测试显示以下平均结果:

ops/sec = 162,597,109 
trains/sec = 54,199,036    
latency ns = 18.5

很好 生产者和消费者的工作速度超过1.6亿次操作/秒。

为了弥补差异,以下结果显示了针对同一案例-3个生产者和1个消费者的 Disruptor测试:

Run 0, Disruptor=11,467,889 ops/sec   
Run 1, Disruptor=11,280,315 ops/sec    
Run 2, Disruptor=11,286,681 ops/sec    
Run 3, Disruptor=11,254,924 ops/sec

在运行另一个具有消息批处理功能的Disruptor 3P:1C测试的结果下方(每批10条消息):

Run 0, Disruptor=116,009,280 ops/sec    
Run 1, Disruptor=128,205,128 ops/sec    
Run 2, Disruptor=101,317,122 ops/sec    
Run 3, Disruptor=98,716,683 ops/sec;

最后是Disruptor测试的结果,但使用3P:1C方案的LinkedBlockingQueue实现:

Run 0, BlockingQueue=4,546,281 ops/sec   
Run 1, BlockingQueue=4,508,769 ops/sec    
Run 2, BlockingQueue=4,101,386 ops/sec    
Run 3, BlockingQueue=4,124,561 ops/sec

如您所见,铁路方法提供的平均吞吐量为162,597,109 ops / sec,而在相同情况下使用Disruptor的最佳结果仅为128,205,128 ops / sec。 对于LinkedBlockingQueue,最佳结果仅为4,546,281 ops / sec。

铁路算法引入了一种简单的事件批处理方法,可显着提高吞吐量。 通过玩火车容量或火车数量,可以很容易地配置它以获得吞吐量/延迟的期望结果。

同样,铁路可以用于生产者和消费者的混合,在真正复杂的情况下使用,而同一线程可以用于消费消息,处理消息并将结果返回到环网:

图12:生产者和消费者混合的铁路

最后,我将提供针对超高吞吐量的单生产者/单消费者测试的优化:

图13:具有单一生产者和单一消费者的铁路

它具有以下平均结果:每秒吞吐量超过一亿五千万(1,569,884,271)次操作,而延迟等于1.3微秒。 如您所见,测试结果与本文开头描述的单线程测试的结果处于相同的数量级,每秒执行2,681,850,373次操作。

在这里,我将让您得出自己的结论。

我希望在以后的文章中演示如何为生产者和消费者的不同组合使用Queue和BlockingQueue接口支持Railway算法。 敬请关注。

翻译自: https://www.infoq.com/articles/High-Performance-Java-Inter-Thread-Communications/?topicPageSponsorship=c1246725-b0a7-43a6-9ef9-68102c8d48e1

java多线程间通信

这篇关于java多线程间通信_Java中的线程间通信以光速传输的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/164871

相关文章

在Ubuntu上部署SpringBoot应用的操作步骤

《在Ubuntu上部署SpringBoot应用的操作步骤》随着云计算和容器化技术的普及,Linux服务器已成为部署Web应用程序的主流平台之一,Java作为一种跨平台的编程语言,具有广泛的应用场景,本... 目录一、部署准备二、安装 Java 环境1. 安装 JDK2. 验证 Java 安装三、安装 mys

Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单

《Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单》:本文主要介绍Springboot的ThreadPoolTaskScheduler线... 目录ThreadPoolTaskScheduler线程池实现15分钟不操作自动取消订单概要1,创建订单后

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Java调用Python代码的几种方法小结

《Java调用Python代码的几种方法小结》Python语言有丰富的系统管理、数据处理、统计类软件包,因此从java应用中调用Python代码的需求很常见、实用,本文介绍几种方法从java调用Pyt... 目录引言Java core使用ProcessBuilder使用Java脚本引擎总结引言python

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

Java中的密码加密方式

《Java中的密码加密方式》文章介绍了Java中使用MD5算法对密码进行加密的方法,以及如何通过加盐和多重加密来提高密码的安全性,MD5是一种不可逆的哈希算法,适合用于存储密码,因为其输出的摘要长度固... 目录Java的密码加密方式密码加密一般的应用方式是总结Java的密码加密方式密码加密【这里采用的

Java中ArrayList的8种浅拷贝方式示例代码

《Java中ArrayList的8种浅拷贝方式示例代码》:本文主要介绍Java中ArrayList的8种浅拷贝方式的相关资料,讲解了Java中ArrayList的浅拷贝概念,并详细分享了八种实现浅... 目录引言什么是浅拷贝?ArrayList 浅拷贝的重要性方法一:使用构造函数方法二:使用 addAll(

解决mybatis-plus-boot-starter与mybatis-spring-boot-starter的错误问题

《解决mybatis-plus-boot-starter与mybatis-spring-boot-starter的错误问题》本文主要讲述了在使用MyBatis和MyBatis-Plus时遇到的绑定异常... 目录myBATis-plus-boot-starpythonter与mybatis-spring-b