深读源码-java线程系列之ForkJoinPool深入解析

2023-11-07 09:18

本文主要是介绍深读源码-java线程系列之ForkJoinPool深入解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

随着在硬件上多核处理器的发展和广泛使用,并发编程成为程序员必须掌握的一门技术,在面试中也经常考查面试者并发相关的知识。

今天,我们就来看一道面试题:

如何充分利用多核CPU,计算很大数组中所有整数的和?

剖析

  • 单线程相加?

我们最容易想到就是单线程相加,一个for循环搞定。

  • 线程池相加?

如果进一步优化,我们会自然而然地想到使用线程池来分段相加,最后再把每个段的结果相加。

  • 其它?

Yes,就是我们今天的主角——ForkJoinPool,但是它要怎么实现呢?似乎没怎么用过哈^^

三种实现

OK,剖析完了,我们直接来看三种实现,不墨迹,直接上菜。

/*** 计算1亿个整数的和*/
public class ForkJoinPoolTest01 {public static void main(String[] args) throws ExecutionException, InterruptedException {// 构造数据int length = 100000000;long[] arr = new long[length];for (int i = 0; i < length; i++) {arr[i] = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);}// 单线程singleThreadSum(arr);// ThreadPoolExecutor线程池multiThreadSum(arr);// ForkJoinPool线程池forkJoinSum(arr);}private static void singleThreadSum(long[] arr) {long start = System.currentTimeMillis();long sum = 0;for (int i = 0; i < arr.length; i++) {// 模拟耗时sum += (arr[i]/3*3/3*3/3*3/3*3/3*3);}System.out.println("sum: " + sum);System.out.println("single thread elapse: " + (System.currentTimeMillis() - start));}private static void multiThreadSum(long[] arr) throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();int count = 8;ExecutorService threadPool = Executors.newFixedThreadPool(count);List<Future<Long>> list = new ArrayList<>();for (int i = 0; i < count; i++) {int num = i;// 分段提交任务Future<Long> future = threadPool.submit(() -> {long sum = 0;for (int j = arr.length / count * num; j < (arr.length / count * (num + 1)); j++) {try {// 模拟耗时sum += (arr[j]/3*3/3*3/3*3/3*3/3*3);} catch (Exception e) {e.printStackTrace();}}return sum;});list.add(future);}// 每个段结果相加long sum = 0;for (Future<Long> future : list) {sum += future.get();}System.out.println("sum: " + sum);System.out.println("multi thread elapse: " + (System.currentTimeMillis() - start));}private static void forkJoinSum(long[] arr) throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();// 提交任务ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(new SumTask(arr, 0, arr.length));// 获取结果Long sum = forkJoinTask.get();forkJoinPool.shutdown();System.out.println("sum: " + sum);System.out.println("fork join elapse: " + (System.currentTimeMillis() - start));}private static class SumTask extends RecursiveTask<Long> {private long[] arr;private int from;private int to;public SumTask(long[] arr, int from, int to) {this.arr = arr;this.from = from;this.to = to;}@Overrideprotected Long compute() {// 小于1000的时候直接相加,可灵活调整if (to - from <= 1000) {long sum = 0;for (int i = from; i < to; i++) {// 模拟耗时sum += (arr[i]/3*3/3*3/3*3/3*3/3*3);}return sum;}// 分成两段任务int middle = (from + to) / 2;SumTask left = new SumTask(arr, from, middle);SumTask right = new SumTask(arr, middle, to);// 提交左边的任务left.fork();// 右边的任务直接利用当前线程计算,节约开销Long rightResult = right.compute();// 等待左边计算完毕Long leftResult = left.join();// 返回结果return leftResult + rightResult;}}
}

实际上计算1亿个整数相加,单线程是最快的,我的电脑大概是100ms左右,使用线程池反而会变慢。

所以,为了演示ForkJoinPool的牛逼之处,我把每个数都/3*3/3*3/3*3/3*3/3*3了一顿操作,用来模拟计算耗时。

来看结果:

sum: 107352457433800662
single thread elapse: 789
sum: 107352457433800662
multi thread elapse: 228
sum: 107352457433800662
fork join elapse: 189

可以看到,ForkJoinPool相对普通线程池还是有很大提升的。

问题:普通线程池能否实现ForkJoinPool这种计算方式呢,即大任务拆中任务,中任务拆小任务,最后再汇总?

 

你可以试试看(-᷅_-᷄)

OK,下面我们正式进入ForkJoinPool的解析。

分治法

  • 基本思想

把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。

  • 步骤

(1)分割原问题:

(2)求解子问题:

(3)合并子问题的解为原问题的解。

在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。

  • 典型应用场景

(1)二分搜索

(2)大整数乘法

(3)Strassen矩阵乘法

(4)棋盘覆盖

(5)归并排序

(6)快速排序

(7)线性时间选择

(8)汉诺塔

ForkJoinPool继承体系

ForkJoinPool是 java 7 中新增的线程池类,它的继承体系如下:

 

ForkJoinPool和ThreadPoolExecutor都是继承自AbstractExecutorService抽象类,所以它和ThreadPoolExecutor的使用几乎没有多少区别,除了任务变成了ForkJoinTask以外。

这里又运用到了一种很重要的设计原则——开闭原则——对修改关闭,对扩展开放。

可见整个线程池体系一开始的接口设计就很好,新增一个线程池类,不会对原有的代码造成干扰,还能利用原有的特性。

ForkJoinTask

两个主要方法

  • fork()

fork()方法类似于线程的Thread.start()方法,但是它不是真的启动一个线程,而是将任务放入到工作队列中。

  • join()

join()方法类似于线程的Thread.join()方法,但是它不是简单地阻塞线程,而是利用工作线程运行其它任务。当一个工作线程中调用了join()方法,它将处理其它任务,直到注意到目标子任务已经完成了。

三个子类

  • RecursiveAction

无返回值任务。

  • RecursiveTask

有返回值任务。

  • CountedCompleter

无返回值任务,完成任务后可以触发回调。

ForkJoinPool内部原理

ForkJoinPool内部使用的是“工作窃取”算法实现的。

forkjoinpool

(1)每个工作线程都有自己的工作队列WorkQueue;

(2)这是一个双端队列,它是线程私有的;

(3)ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务;

(4)为了最大化地利用CPU,空闲的线程将从其它线程的队列中“窃取”任务来执行;

(5)从工作队列的尾部窃取任务,以减少竞争;

(6)双端队列的操作:push()/pop()仅在其所有者工作线程中调用,poll()是由其它线程窃取任务时调用的;

(7)当只剩下最后一个任务时,还是会存在竞争,是通过CAS来实现的;

forkjoinpool

ForkJoinPool最佳实践

(1)最适合的是计算密集型任务;

(2)在需要阻塞工作线程时,可以使用ManagedBlocker;

(3)不应该在RecursiveTask的内部使用ForkJoinPool.invoke()/invokeAll();

总结

(1)ForkJoinPool特别适合于“分而治之”算法的实现;

(2)ForkJoinPool和ThreadPoolExecutor是互补的,不是谁替代谁的关系,二者适用的场景不同;

(3)ForkJoinTask有两个核心方法——fork()和join(),有三个重要子类——RecursiveAction、RecursiveTask和CountedCompleter;

(4)ForkjoinPool内部基于“工作窃取”算法实现;

(5)每个线程有自己的工作队列,它是一个双端队列,自己从队列头存取任务,其它线程从尾部窃取任务;

(6)ForkJoinPool最适合于计算密集型任务,但也可以使用ManagedBlocker以便用于阻塞型任务;

(7)RecursiveTask内部可以少调用一次fork(),利用当前线程处理,这是一种技巧;

彩蛋

ManagedBlocker怎么使用?

答:ManagedBlocker相当于明确告诉ForkJoinPool框架要阻塞了,ForkJoinPool就会启另一个线程来运行任务,以最大化地利用CPU。

请看下面的例子,自己琢磨哈^^。

/*** 斐波那契数列* 一个数是它前面两个数之和* 1,1,2,3,5,8,13,21*/
public class Fibonacci {public static void main(String[] args) {long time = System.currentTimeMillis();Fibonacci fib = new Fibonacci();int result = fib.f(1_000).bitCount();time = System.currentTimeMillis() - time;System.out.println("result= " + result);System.out.println("test1_000() time = " + time);}public BigInteger f(int n) {Map<Integer, BigInteger> cache = new ConcurrentHashMap<>();cache.put(0, BigInteger.ZERO);cache.put(1, BigInteger.ONE);return f(n, cache);}private final BigInteger RESERVED = BigInteger.valueOf(-1000);public BigInteger f(int n, Map<Integer, BigInteger> cache) {BigInteger result = cache.putIfAbsent(n, RESERVED);if (result == null) {int half = (n + 1) / 2;RecursiveTask<BigInteger> f0_task = new RecursiveTask<BigInteger>() {@Overrideprotected BigInteger compute() {return f(half - 1, cache);}};f0_task.fork();BigInteger f1 = f(half, cache);BigInteger f0 = f0_task.join();long time = n > 10_000 ? System.currentTimeMillis() : 0;try {if (n % 2 == 1) {result = f0.multiply(f0).add(f1.multiply(f1));} else {result = f0.shiftLeft(1).add(f1).multiply(f1);}synchronized (RESERVED) {cache.put(n, result);RESERVED.notifyAll();}} finally {time = n > 10_000 ? System.currentTimeMillis() - time : 0;if (time > 50)System.out.printf("f(%d) took %d%n", n, time);}} else if (result == RESERVED) {try {ReservedFibonacciBlocker blocker = new ReservedFibonacciBlocker(n, cache);ForkJoinPool.managedBlock(blocker);result = blocker.result;} catch (InterruptedException e) {throw new CancellationException("interrupted");}}return result;// return f(n - 1).add(f(n - 2));}private class ReservedFibonacciBlocker implements ForkJoinPool.ManagedBlocker {private BigInteger result;private final int n;private final Map<Integer, BigInteger> cache;public ReservedFibonacciBlocker(int n, Map<Integer, BigInteger> cache) {this.n = n;this.cache = cache;}@Overridepublic boolean block() throws InterruptedException {synchronized (RESERVED) {while (!isReleasable()) {RESERVED.wait();}}return true;}@Overridepublic boolean isReleasable() {return (result = cache.get(n)) != RESERVED;}}
}

原文链接:https://www.cnblogs.com/tong-yuan/p/11824018.html

这篇关于深读源码-java线程系列之ForkJoinPool深入解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/362647

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory