本文主要是介绍【并发编程篇】详解Forkjoin,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 🍔什么是Forkjoin
- 🎈Forkjoin的方法
- 🎆代码实现
🍔什么是Forkjoin
Fork/Join 是一种在多线程领域中常用的算法或技术,它的核心思想是将大任务分割成若干个小任务,然后将这些小任务分配给多个线程并行处理,最终将结果合并起来。这种思想可以应用于多种场景,例如图像处理、批处理、并行排序等。
在 Java 中,Fork/Join 这种思想被封装在了 java.util.concurrent 包中的 ForkJoinPool 类和 RecursiveTask 类中。ForkJoinPool 类是一个线程池,用于管理多个线程的执行,而 RecursiveTask 类则是一个抽象类,用于定义可分解的任务。通过使用这些类,开发者可以非常方便地实现 Fork/Join 的并行计算功能,从而提高应用程序的性能和效率。
总之,Fork/Join 并不是一个框架,而是一种并发编程技术,它可以帮助开发者实现高效的并行计算,并发挥多核 CPU 的计算能力。
🎈Forkjoin的方法
Fork/Join 框架提供了一些核心的方法来支持任务的分解和合并,下面我会对这些方法进行理论讲解:
- fork() 方法:fork() 方法用于将当前任务进行分解,将其拆分成更小的子任务并提交给 Fork/Join 框架进行并行处理。该方法会异步地启动一个新的子任务,并立即返回,不会阻塞当前线程。
- join() 方法:join() 方法用于等待子任务的执行完成,并获取其结果。在调用 join() 方法之前,程序会阻塞当前线程,直到子任务的执行完成。如果子任务还没有完成,则当前线程会暂停执行,转而执行其他可执行任务,从而实现工作窃取的效果。
- invoke() 方法:invoke() 方法用于提交一个任务给 Fork/Join 框架进行处理,并返回任务的执行结果。该方法会同步地启动一个任务,并阻塞当前线程,直到任务执行完成并返回结果。这个方法通常用于提交根任务或最顶层的任务。
- RecursiveTask 类和 RecursiveAction 类:Fork/Join 框架提供了两个抽象类 RecursiveTask 和 RecursiveAction,用于定义可分解的任务。RecursiveTask 适用于需要返回结果的任务,而 RecursiveAction 适用于不需要返回结果的任务。这两个类都有一个抽象方法 compute(),我们需要在子类中实现该方法来定义具体的任务逻辑。
- Work-Stealing(工作窃取):Fork/Join 框架采用了工作窃取算法,使得线程在处理完自己的任务后可以从其他线程的队列中偷取任务来执行。这种方式可以提高并行计算的效率和负载均衡性能。工作窃取是通过双端队列实现的,每个线程都有自己的任务队列,当一个线程完成自己队列中的任务后,它会尝试从其他线程的队列末尾窃取任务来执行。
这些方法和概念是 Fork/Join 框架中非常重要的部分,它们通过任务的分解、合并和工作窃取机制,实现了高效的并行计算。理解并熟练使用这些方法可以帮助开发者更好地利用 Fork/Join 框架来处理并行计算任务。
🎆代码实现
ForkjoinDemo.java
package org.Test6;import java.util.concurrent.RecursiveTask;public class ForkjoinDemo extends RecursiveTask<Long> {private Long start; // 1private Long end; // 1990900000// 临界值private Long temp = 10000L;public ForkjoinDemo(Long start, Long end) {this.start = start;this.end = end;}// 计算方法@Overrideprotected Long compute() {if ((end - start) < temp) {Long sum = 0L;for (Long i = start; i <= end; i++) {sum += i;}return sum;} else { // forkjoin 递归long middle = (start + end) / 2; // 中间值ForkjoinDemo task1 = new ForkjoinDemo(start, middle);task1.fork(); // 拆分任务,把任务压入线程队列ForkjoinDemo task2 = new ForkjoinDemo(middle + 1, end);task2.fork(); // 拆分任务,把任务压入线程队列return task1.join() + task2.join();}}
}
Test.java
package org.Test6;import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;public class Test {public static void test1() {Long sum = 0L;long start = System.currentTimeMillis();for (Long i = 1L; i <= 10_0000_0000; i++) {sum += i;}long end = System.currentTimeMillis();System.out.println("sum=" + sum + " 时间:" + (end - start));}// 会使用ForkJoinpublic static void test2() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Long> task = new ForkjoinDemo(0L, 10_0000_0000L);ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务Long sum = submit.get();long end = System.currentTimeMillis();System.out.println("sum=" + sum + " 时间:" + (end - start));}public static void test3() {long start = System.currentTimeMillis();// Stream并行流long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);long end = System.currentTimeMillis();System.out.println("sum=" + sum + " 时间:" + (end - start));}public static void main(String[] args) throws ExecutionException, InterruptedException {test1();test2();test3();}}
在技术的道路上,我们不断探索、不断前行,不断面对挑战、不断突破自我。科技的发展改变着世界,而我们作为技术人员,也在这个过程中书写着自己的篇章。让我们携手并进,共同努力,开创美好的未来!愿我们在科技的征途上不断奋进,创造出更加美好、更加智能的明天!
这篇关于【并发编程篇】详解Forkjoin的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!