J.U.C Review - Stream并行计算原理源码分析

2024-09-07 06:52

本文主要是介绍J.U.C Review - Stream并行计算原理源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • Java 8 Stream简介
  • Stream单线程串行计算
  • Stream多线程并行计算
  • 源码分析Stream并行计算原理
  • Stream并行计算的性能提升

在这里插入图片描述

Java 8 Stream简介

自Java 8推出以来,开发者可以使用Stream接口和lambda表达式实现流式计算。这种编程风格不仅简化了对集合操作的代码,还提高了代码的可读性和性能。

Stream接口提供了多种集合操作方法,包括empty(判空)、filter(过滤)、max(求最大值)、findFirstfindAny(查找操作)等,使得对集合的操作更加灵活和直观。


Stream单线程串行计算

在默认情况下,Stream接口是以串行的方式运行的,这意味着所有的操作都在一个线程内执行。我们可以通过以下示例代码展示这一点:

public class StreamDemo {public static void main(String[] args) {Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).reduce((a, b) -> {System.out.println(String.format("%s: %d + %d = %d",Thread.currentThread().getName(), a, b, a + b));return a + b;}).ifPresent(System.out::println);}
}

在这个例子中,我们通过Stream.of()方法创建了一个包含数字1到9的流。随后,调用reduce方法对这些数字进行累加操作。reduce方法的作用是从前两个元素开始,执行指定操作(在此示例中为加法),然后将结果与下一个元素进行相同的操作,直到处理完所有元素。

程序的输出如下:

main: 1 + 2 = 3  
main: 3 + 3 = 6  
main: 6 + 4 = 10  
main: 10 + 5 = 15  
main: 15 + 6 = 21  
main: 21 + 7 = 28  
main: 28 + 8 = 36  
main: 36 + 9 = 45  
45

从输出可以看出,所有计算均由main线程执行,并且操作是严格按照元素顺序串行完成的。


Stream多线程并行计算

然而,单线程串行执行并不是唯一的选择。在现代多核处理器的时代,我们可以通过并行计算来更高效地利用计算资源。例如,当计算1+2=3的同时,我们可以在另一个线程中计算3+4=7,最后将这些部分结果进行合并。这种思想与Fork/Join框架的设计理念非常类似。

通过以下代码,我们可以让Stream在多线程中并行执行:

public class StreamParallelDemo {public static void main(String[] args) {Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).parallel().reduce((a, b) -> {System.out.println(String.format("%s: %d + %d = %d",Thread.currentThread().getName(), a, b, a + b));return a + b;}).ifPresent(System.out::println);}
}

运行这段代码,输出如下:

ForkJoinPool.commonPool-worker-1: 3 + 4 = 7  
ForkJoinPool.commonPool-worker-4: 8 + 9 = 17  
ForkJoinPool.commonPool-worker-2: 5 + 6 = 11  
ForkJoinPool.commonPool-worker-3: 1 + 2 = 3  
ForkJoinPool.commonPool-worker-4: 7 + 17 = 24  
ForkJoinPool.commonPool-worker-4: 11 + 24 = 35  
ForkJoinPool.commonPool-worker-3: 3 + 7 = 10  
ForkJoinPool.commonPool-worker-3: 10 + 35 = 45  
45

从输出结果可以看出,这些计算是并行完成的,使用了ForkJoinPool中的commonPool线程池。尽管各个部分的计算是并行执行的,最终的结果仍然是正确的,因为Fork/Join框架负责协调这些并行任务。


源码分析Stream并行计算原理

通过以上的实践,我们知道Stream的并行计算底层是基于Fork/Join框架的。但具体是如何实现的?我们可以通过源码分析来探究。

首先,Stream.of()方法只是生成一个简单的流。接下来,我们查看parallel()方法的实现。由于这里的数据类型是int,因此调用的是BaseStream接口的parallel()方法。BaseStream接口的唯一实现类是AbstractPipeline类。以下是AbstractPipeline类的parallel()方法:

public final S parallel() {sourceStage.parallel = true;return (S) this;
}

这个方法的作用非常简单,仅仅是将sourceStage.parallel标志位设置为true,表示该流将以并行方式执行。

接下来,查看reduce方法的实现。Stream.reduce()方法的具体实现是通过ReferencePipeline这个抽象类,该类继承了AbstractPipeline类:

@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {return evaluate(ReduceOps.makeRef(accumulator));
}final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}@Override
public final boolean isParallel() {return sourceStage.parallel;
}

从源码可以看出,reduce方法调用了evaluate方法,而evaluate方法根据parallel标志位来决定是并行执行还是串行执行。如果paralleltrue,则调用evaluateParallel方法,否则调用evaluateSequential方法。

我们再来看evaluateParallel方法在ReduceOps.ReduceOp类中的具体实现:

@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

evaluateParallel方法创建了一个ReduceTask实例,并调用其invoke()方法来执行计算。ReduceTask类继承自AbstractTaskAbstractTask又继承自CountedCompleter,最终继承自ForkJoinTask。这就解释了为什么Stream的并行计算底层使用了Fork/Join框架。


Stream并行计算的性能提升

最后,我们通过一个简单的性能测试来验证Stream并行计算的优势。下面的代码演示了如何计算一千万个随机数的和,并比较串行计算和并行计算的时间开销:

public class StreamParallelDemo {public static void main(String[] args) {System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));Random random = new Random();List<Integer> list = new ArrayList<>(1000_0000);for (int i = 0; i < 1000_0000; i++) {list.add(random.nextInt(100));}long prevTime = getCurrentTime();list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() - prevTime));prevTime = getCurrentTime();list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() - prevTime));}private static long getCurrentTime() {return System.currentTimeMillis();}
}

在一台8核计算机上的输出结果如下:

本计算机的核数:8  
495156156  
单线程计算耗时:223  
495156156  
多线程计算耗时:95  

结果表明,在多核环境下,Stream的并行计算相比串行计算确实能够显著提升性能。然而,性能提升的幅度并非线性增长,因为线程管理和上下文切换本身也会带来一定的开销。如果在单核环境中,串行计算反而可能会比并行计算更快。

总结而言,Java 8的Stream并行计算通过简化代码的方式,利用了底层的多核资源,大幅提升了复杂集合操作的性能。然而在实际应用中,开发者需要根据具体的硬件环境和任务特性来决定是否使用并行计算。

在这里插入图片描述

这篇关于J.U.C Review - Stream并行计算原理源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1144375

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud