【JUC】十、ForkJoin

2023-11-21 23:04
文章标签 juc forkjoin

本文主要是介绍【JUC】十、ForkJoin,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1、分支合并框架
  • 2、案例
  • 3、ForkJoinTask
  • 4、工作窃取算法
  • 5、ForkJoinPool

一个个任务执行在一个个线程上,倘若某一个任务耗时很久,期间其他线程都无事可做,显然没有利用好多核CPU这一计算机资源,因此,出现了"分而治之"的解决方案。

在这里插入图片描述

1、分支合并框架

Fork/Join即分支合并,用处是把一个大任务拆分成多个子任务来并行处理,最后再合并子任务的结果。(since Java1.7)

  • Fork:把一个复杂的任务进行拆分,大事化小
  • Join:把拆分的子任务的结果做合并

示意图:

在这里插入图片描述

举个实际例子:
在这里插入图片描述

2、案例

以下采用分支合并框架来实现1+2+…+100

在这里插入图片描述

通过继承RecursiveTask类自定义任务,泛型为result的类型:

//自定义任务类继承RecursiveTask
class MyTask extends RecursiveTask<Integer> {//拆分差值不超过10,来计算10以内的运算private static final int VALUE = 10;//拆分起始值private int begin;//拆分结束值private int end;//返回结果private int result;//构造中传入起始值public MyTask(int begin, int end) {this.begin = begin;this.end = end;}//拆分与合并@Overrideprotected Integer compute() {//判断即将要相加的两个数差值是否大于10if (end - begin <= VALUE) {for (int i = begin; i <= end; i++) {result = result + i;}//否,则进一步拆分} else {//中间值int middle = (begin+end) /2;//左边MyTask task01 = new MyTask(begin, middle);//右边MyTask task02 = new MyTask(middle + 1, end);//调用方法拆分task01.fork();task02.fork();//合并结果result = task01.join() + task02.join();}return result;}
}

以上自定义Task中体现的思路就是:问题足够小的时候,直接处理,否则就继续拆分。创建forkjoinpool对象,提交任务,获取执行结果:

public class ForkJoinDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建拆分任务对象MyTask myTask = new MyTask(0, 100);//创建分支合并池对象ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);//获取最终合并之后的结果Integer result = forkJoinTask.get();System.out.println(result);//关闭池对象forkJoinPool.shutdown();}}

结果:

5050

3、ForkJoinTask

在这里插入图片描述
ForkJoinTask有两个子类:Recursive即递归的意思

  • RecursiveTask:有返回值的任务
  • RecursiveAction:没有返回值的任务

ForkJoinTask还实现了Future接口,即说明是一个可以异步获取结果的任务。常用方法有:

  • fork:分割子任务,判断当前线程是否可以转型为ForkJoinWorkerThread类型,即是不是通过ForkJoinPool线程池执行的ForkJoinTask,是就push到队列里

在这里插入图片描述

  • 合并子任务结果:join
//调用方法拆分
task01.fork();
task02.fork();
//合并结果
result = task01.join() + task02.join();
  • 获取任务执行结果:get,即等待任务执行完成并返回任务计算结果

【方法源码详解参考这篇】

4、工作窃取算法

先看类ForkJoinWorkerThread类,它是执行前面ForkJoinTask的线程。ForkJoin框架使用的线程维护了一个双端队列,并通过工作窃取算法(work-stealing)来提高任务执行的效率。

在这里插入图片描述

工作窃取算法,在ForkJoin框架中就是一个线程的任务队列里的活儿都干完了,就去其他线程维护的任务队列的尾部窃取一个任务来执行。用张网图示意:

在这里插入图片描述

因此:

  • 对于线程自身维护的任务队列,线程自身执行任务的顺序是后进先出
  • 对于窃取其他线程队列的线程,则是先进先出(从队尾窃取并执行)

如图,对于线程1,任务的出入队都在队首,而负责窃取任务的线程2,其从队尾取任务,也就是说,只有线程1的任务队列只剩最后一个任务的时候,才会抢夺一次锁。如此,就大大提高了并发任务的执行效率。

5、ForkJoinPool

FoolJoinPool是运行ForkJoinTask的线程池,同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口

在这里插入图片描述

任务提交到ForkJoinPool后,池里维护的线程来执行任务

ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);

其构造方法有四个参数:

public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode){
//.....

分别是:

  • parallelism:期望并发数,默认会使用Runtime.getRuntime().availableProcessors()的值,即当前计算机可用的CPU数量
  • factory:创建ForkJoin工作线程的工厂,默认为defaultForkJoinWorkerThreadFactory
  • handler:执行任务时遇到不可恢复的错误时的处理程序,默认为null
  • asyncMode:工作线程获取任务使用FIFO(先进先出)模式还是LIFO模式,默认为LIFO(后进先出)

这篇关于【JUC】十、ForkJoin的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/405721

相关文章

【编程底层思考】详解Java的JUC多线程并发编程底层组件AQS的作用及原理

Java中的AbstractQueuedSynchronizer(简称AQS)是位于java.util.concurrent.locks包中的一个核心组件,用于构建锁和其他同步器。AQS为实现依赖于FIFO(先进先出)等待队列的阻塞锁和相关同步器提供了一套高效、可扩展的框架。 一、AQS的作用 统一同步状态管理:AQS提供了一个int类型的成员变量state,用于表示同步状态。子类可以根据自己

JAVA并发编程JUC包之CAS原理

在JDK 1.5之后,java api中提供了java.util.concurrent包,简称JUC包。这个包定义了很多我们非常熟悉的工具类,比如原子类AtomicXX,线程池executors、信号量semaphore、阻塞队列、同步器等。日常并发编程要用的熟面孔基本都在这里。        首先,Atomic包,原子操作类,提供了用法简单、性能高效、最重要是线程安全的更新一个变量。支持

【硬刚Java并发】JUC基础(13):简介

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Java并发部分补充。

【硬刚Java并发】JUC基础(十二):ForkJoinPool 分支/合并框架

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Java并发部分补充。 1  Fork/Join 框架 Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总。 2 Fork/Join 框架与线程池的区别 采用 “工作窃取”模式(work-st

【硬刚Java并发】JUC基础(十一):线程调度

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Java并发部分补充。 1 ScheduledExecutorService 一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令。 package com.atguigu.juc;import java.util.Random;import java.util.concurrent.

【硬刚Java并发】JUC基础(十):线程池

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Java并发部分补充。 线程池 第四种获取线程的方法:线程池,一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑

【硬刚Java并发】JUC基础(九):线程八锁

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Java并发部分补充。 1 线程八锁 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法锁的是当前对象this,被锁定后,其它的线程都不能

【硬刚Java并发】JUC基础(八):ReadWriteLock 读写锁

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Java并发部分补充。 读-写锁 ReadWriteLock ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。。 ReadWriteLock 读取操作通常不会改变共享资源,但执行写入操作

【硬刚Java并发】JUC基础(七):Condition 控制线程通信

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Java并发部分补充。 Condition Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。为了避免兼容性问题,Condition 方法的

【硬刚Java并发】JUC基础(六):Lock 同步锁

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Java并发部分补充。 显示锁 Lock 在 Java 5.0 之前,协调共享对象的访问时可以使用的机制只有 synchronized 和 volatile 。Java 5.0 后增加了一些新的机制,但并不是一种替代内置锁的方法,而是当内置锁不适用时,作为一种可选择的高级功能。 ReentrantLock 实