ForkJoin 框架 源码解析

2024-03-24 09:08
文章标签 源码 框架 解析 forkjoin

本文主要是介绍ForkJoin 框架 源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ForkJoin 框架

work-stealing框架规则

框架规则:
I.每一个工作线程都维护自己调度队列中的可运行任务。
II.调度队列支持队列和栈二项性,既支持LIFO(后进先出,栈模式),又支持FIFO(先进先出, 队列模式)。
III.工作线程处理任务时,fork产生的子任务,将会放入到自己的调度队列中。
IV.工作线程使用LIFO(栈模式),来处理自身调度队列中的任务。
V.当某个工作线程的调度队列无任务时,随机一个别的调度队列,通过FIFO模式(先进先出, 队列模式), 取出(窃取)一个任务去运行。
VI.当一个工作线程调用了join操作,被join的目标任务如果没有完成,线程可能偷取其他任务进行处理,直到目标任务已经结束(通过isDone方法)。所有的任务都会无阻塞的完成。
VII.当一个工作线程本地队列没有任务处理,同时也无法从线程池中的其他队列中偷取到任务时,它就会退出扫描(通过yield、sleep和/或者优先级调整)一段时间,超时后再度尝试,除非所有的工作线程都处于空闲的状态。在这种情况下,他们都会阻塞直到新的任务加入。

补充:每个工作线程都会有自己的调度队列,但调度队列也可能没有工作线程

基本算法

protected long compute() {if (end - start <= THRESHOLD) {return doResult();} else {high.fork();low.fork();return low.join() + high.join();}
}

框架说明

Forkjoin框架主要是采用分治思想来做处理,把大任务切分成若干小任务,然后小任务再切成更小的任务,以此递归,直到切到最小单元处理任务为止。任务处理上线程采用work-stealing规则进行处理。简单的任务切分模式如下图所示:
在这里插入图片描述

框架组成

I. ForkJoinPool: 线程池类,管理任务队列,队列数组中,奇数索引位是带有工作线程的任务队列,偶数索引位(64个槽位)是不带工作线程的共享队列。提供了任务提交入口,和任务偷取框架核心运行支持。
II. ForkJoinPool.WorkQueue: 任务环形双端队列,实现了队列和栈的二项性。队列采用cas原子操作实现多线程安全。
III.ForkJoinWorkThread: 工作线程,实现了向线程池奇数索引注册工作队列。
IV. ForkJoinTask: 任务抽象类,提供了fork和join操作接口,子类有三个,分别是:
1.RecursiveAction,递归无返回结果的任务, compute拆分子任务
2.RecursiveTask, 递归有返回结果的任务,compute拆分子任务
3.CountedCompleter, 一个特殊的计数任务类,类中维护着子任务计数器pending和父任务指针,这样就形成了,从根任务到子任务一个树形结构,每个结点都有孩子的计数器pending,一个任务完成后会调用完成接口, 如果子任务数pending递减为0,会回调任务完成接口和再递归调用父任务的pending递减操作,然后再判断父任务的pending是否递减到0…,就这样递归调用,直到根任务pending递减到0,则此根任务下的所有任务完成。此类任务,ForkJoin框架中作了特殊处理,任务的运行偷取都做了相对的特殊处理,因此在运行性能上会更高。

主要常量和成员变量

 // Bounds/* 如果并发线程数是MAX_CAP=0x7fff, 则队列数组的最大长度是0x10000*,所提供的偶数索引槽数:0x8000, 奇数索引槽数:0x8000, 所有的最大非工作线程的队列有槽位:64 < 0x8000,*    最大并发工作线程队列所在奇数槽位:0x7fff < 0x8000, *所以队列数组不会超过SMASK = 0xffff*/static final int SMASK        = 0xffff;        // short bits == max index//即最大的工作线程并发数static final int MAX_CAP      = 0x7fff;        // max #workers - 1//队列数组索引的偶数掩码static final int EVENMASK     = 0xfffe;        // even short bits//最大64个偶数槽位,用来非工作线程队列的偶数索引位static final int SQMASK       = 0x007e;        // max 64 (even) slots// Masks and units for WorkQueue.scanState and ctl sp subfield//任务被运行处理的时候,会标记此状态,表示任务正在扫描static final int SCANNING     = 1;             // false when running tasks//任务处于空闲状态,未被激活static final int INACTIVE     = 1 << 31;       // must be negative//ctl 低16位SS(栈顶队列池化索引)的版本号(防止CAS的ABA问题)static final int SS_SEQ       = 1 << 16;       // version count// Mode bits for ForkJoinPool.config and WorkQueue.config//int低16位掩码static final int MODE_MASK    = 0xffff << 16;  // top half of int//后进先出,栈模式static final int LIFO_QUEUE   = 0;//先进先出,队列模式static final int FIFO_QUEUE   = 1 << 16;//共享队列模式
static final int SHARED_QUEUE = 1 << 31;       // must be negative//ID SS, ctl 低32位掩码private static final long SP_MASK    = 0xffffffffL;//UC, 高32位掩码private static final long UC_MASK    = ~SP_MASK;// Active counts//活动的工作线程移位操作private static final int  AC_SHIFT   = 48;//活跃的线程 高16位+1增量单位private static final long AC_UNIT    = 0x0001L << AC_SHIFT;//活跃线程AC,高16位掩码private static final long AC_MASK    = 0xffffL << AC_SHIFT;// Total counts//总的工作线程TC,移位private static final int  TC_SHIFT   = 32;//TC增量单位+1,高(32-48)位private static final long TC_UNIT    = 0x0001L << TC_SHIFT;//tc, 32-48掩码private static final long TC_MASK    = 0xffffL << TC_SHIFT;//增加工作线程的位运算,最高位1,如果AC最高位1,则为负数未达到并发线程量,否则为0,则达到最大并发量private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);// runState bits: SHUTDOWN must be negative, others arbitrary powers of two//lockprivate static final int  RSLOCK     = 1;//通知状态private static final int  RSIGNAL    = 1 << 1;//开始状态private static final int  STARTED    = 1 << 2;//停止状态private static final int  STOP       = 1 << 29;//终止状态private static final int  TERMINATED = 1 << 30;//关闭状态private static final int  SHUTDOWN   = 1 << 31;// Instance fields//池控制,AC (48-64), TC(32-48), SS(16-32), ID(0-16)
volatile long ctl;                   // main pool control
//线程池运行状态
volatile int runState;               // lockable status
//配置,并行度和队列模式进行逻辑或运算组合
final int config;                    // parallelism, mode
//随机种子
int indexSeed;                       // to generate worker index
//线程池的工作队列数组,奇数索引放有工作线程的队列,偶数索引(64槽位)共享工作队列
volatile WorkQueue[] workQueues;     // main registry
//工作线程工厂类
final ForkJoinWorkerThreadFactory factory;
//每个线程的异常处理
final UncaughtExceptionHandler ueh;  // per-worker UEH
//创建工作线程的名字前缀
final String workerNamePrefix;       // to create worker name string
//所有工作线程偷取任务数计数器
volatile AtomicLong stealCounter;    // also used as sync monitor

ctl 每个16位字段意义:

ac 高位(48-64)tc 高位(32-48)ss 低位(16-32)id 低位(0-16)
0000000000000000000000000000000000000000000000000000000000000000

ctl池控制长整形表示,总共有64位,从低位到高位每16位为一个字段,共分成4个字段,这个4个字段的意义如下:
AC: 活动线程数量,线程池构造初始化是负的最大并行度。当有新的工作线程加入或者激活时,就会 +1,直到ac加到0时,激活线程达到最大并行度了。设计成负数的好处是判断ctl是负值时,就说明活动线程ac没有达到饱和,还可以继续增加或者激活工作线程。
TC:总工作线程数量,线程池构造初始化是负的最大并行度。当有新的工作线程加入或者激活时,就会 +1,直到tc加到0时,总线程数达到最大并行度了。一般情况下ac值小于等于tc.因为在没有任务处理的情况下,可能总工作线程不变,激活的线程会变成空闲线程,这时ac值就会变小了。
SS:栈顶等待线程(最后一个处于空闲状态的等待线程)的版本号和状态,版本的控制是16-31位,每次版本变更做+1,栈顶线程状态是第32位来控制,当第32位为1时,表示空闲,0时表示激活。
ID: 栈顶等待线程在线程池中的工作队列数组索引。

ForkJoinPool说明

1.工作线程任务队列:此类型队列拥有一个工作线程,此类型队列任务只能是自身工作线程的fork任务。
2. 共享任务队列:此类型队列没有工作线程,此类型的队列任务是由外部程序提交的任务。
3. 线程池任务队列数组索引规则:偶数位是共享任务队列,共享任务队列只有64个偶数索引槽位。奇数索引是工作线程任务队列。
4. 任务扫描(偷取FIFO模式): 工作线程会生成一个随机开始索引,从池队列数组中开始扫描任务队列(包括自己的队列),a. 扫描到有效的任务,则开始处理任务,任务处理完成后,则开始处理自己内部队列的中的全部任务(LIFO),内部队列任务处理完成后。重新继续扫描任务。b. 未扫描到有效任务,则工作线程设置成空闲状态,并进行等待,如果等待成功,则进行park, 有新任务进入后,会被unpark唤醒,否则工作线程会进入销毁流程。
5. 任务提交:外部任务提交时会push到偶数索引位的共享队列中,任务处理过程中有compute产生的子任务,则子任务会被fork到当前工作线程任务队列中。
6.计数任务处理:这是一个特殊的任务。线程池有针对此任务会做特殊处理。任务awaitJoin时对计数任务调用帮助完成任务时,会帮助完成任务结点下的所有子任务。
7. 帮助偷取处理任务:任务在doJoin时,可能会调到awaitJoin, awaitJoin中,任务如果是 a.计算任务则会调用帮助完成任务(参考6). b.普通任务, 则会调用帮助偷取任务处理,以免当前工作线程空转等待任务完成,这样担任工作线程的利用率,提升了任务并发处理效率。

框架示意图:
在这里插入图片描述

源码说明

外部提交任务externalPush

//外部提交任务,此任务会提交到64个偶数槽中的任务队列其中一个,线程探针是通过提交线程生成的,
//通过线程探针生成队列数组索引,如果该索引下有队列则push到队列中,否则调用externalSubmit(), 即走完队列初始化全部流程加入到任务队列中

  final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;//获取当前线程的探针int r = ThreadLocalRandom.getProbe();//线程池运行状态int rs = runState;//线程池队列数组有效if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&//根据r,获取SQMASK 掩码64个槽位中的r位索引,判断索引位队列是否有效(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&//队列加锁U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;//判断队列中的任务数是否满了if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {//top偏移位int j = ((am & s) << ASHIFT) + ABASE;//放入任务到队列中U.putOrderedObject(a, j, task);//top指针加1U.putOrderedInt(q, QTOP, s + 1);//unlockU.putIntVolatile(q, QLOCK, 0);//任务数=1,如果正好任务被某个工作线程偷走,其他线程扫描时没扫描到处于空闲等待状态,//<1可能其他线程都在空闲等待中,所以这时通知空闲线程有任务要处理了if (n <= 1)signalWork(ws, q);return;}//unlockU.compareAndSwapInt(q, QLOCK, 1, 0);}externalSubmit(task);
}

外部提交任务externalSubmit

//外部任务提交初始化一套流程,初始线程池任务队列数组,初始化新的偶数槽位(最大64个槽位)队列//初始化完成后,如果提交任务成功,则通知工作线程处理任务。private void externalSubmit(ForkJoinTask<?> task) {int r;                                    // initialize caller's probe//调用者线程探针初始化,然后再取随机探针if ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;//线程池被终止了(SHUTDOWN),  帮助关闭if ((rs = runState) < 0) {tryTerminate(false, false);     // help terminatethrow new RejectedExecutionException();}//如果运行状态未开始,或者池化队列数组未初始化,则进行初始化else if ((rs & STARTED) == 0 ||     // initialize((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;//lockrs = lockRunState();try {//如果运行状态未开始if ((rs & STARTED) == 0) {//初始化偷取计数U.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// create workQueues array with size a power of two//并行工作线程数int p = config & SMASK; // ensure at least 2 slots//并行数超出1,取 -1,否则取1int n = (p > 1) ? p - 1 : 1;//保证2的幂次方,这样就可以通过位运行取模n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;//new 队列数组workQueues = new WorkQueue[n];ns = STARTED;}} finally {//unlock 并加上运行初始化状态unlockRunState(rs, (rs & ~RSLOCK) | ns);}}//和SQMASK逻辑运算后,结果会变成偶数,只有64个糟位, 根据探针r取偶数索引位的队列else if ((q = ws[k = r & m & SQMASK]) != null) {//队列lockif (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;int s = q.top;boolean submitted = false; // initial submission or resizingtry {                      // locked version of push//队列任务数未满if ((a != null && a.length > s + 1 - q.base) ||//或者队列扩容成功(a = q.growArray()) != null) {//top在数组中的偏移量int j = (((a.length - 1) & s) << ASHIFT) + ABASE;//放入队列U.putOrderedObject(a, j, task);//top + 1U.putOrderedInt(q, QTOP, s + 1);submitted = true;}} finally {//unlockU.compareAndSwapInt(q, QLOCK, 1, 0);}//任务提交成功,通知工作线程处理任务if (submitted) {signalWork(ws, q);return;}}move = true;                   // move on failure}//如果线程池运行状态未被锁,创建新任务队列else if (((rs = runState) & RSLOCK) == 0) { // create new queueq = new WorkQueue(this, null);//记录探针值,q.hint = r;//配置成共享队列,所有的工作线程都可以偷取处理,此队列在偶数位,没有被工作线程拥有q.config = k | SHARED_QUEUE;//队列状态标记为未激活q.scanState = INACTIVE;//lockrs = lockRunState();           // publish indexif (rs > 0 &&  (ws = workQueues) != null &&k < ws.length && ws[k] == null)//队列放入线程池队列数组中ws[k] = q;                 // else terminated//unlockunlockRunState(rs, rs & ~RSLOCK);}elsemove = true;                   // move if busyif (move)r = ThreadLocalRandom.advanceProbe(r);}}

通知工作线程处理任务signalWork

//通知工作线程处理任务,如果工作线程未达到并行度,则创建新的工作线程。
//否则查看ctl栈顶处于空闲状态的工作线程,如果有,则唤醒

  final void signalWork(WorkQueue[] ws, WorkQueue q) {long c; int sp, i; WorkQueue v; Thread p;//激活的线程量未达到总并发量while ((c = ctl) < 0L) {                       // too few active//ctl栈顶未有waitersif ((sp = (int)c) == 0) {                  // no idle workers//ctl 最高位1,<0,未达到并发量if ((c & ADD_WORKER) != 0L)            // too few workers//增加工作线程tryAddWorker(c);break;}//工作线程数组为空,退出if (ws == null)                            // unstarted/terminatedbreak;//ctl栈顶waiter索引,超出了工作线程数组长度,?if (ws.length <= (i = sp & SMASK))         // terminatedbreak;//此索引的工作线程被撤销停止了if ((v = ws[i]) == null)                   // terminatingbreak;//增加ctl栈顶工作线程的版本号,并设置激活状态int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState//ctl栈顶状态和工作线程状态比较int d = sp - v.scanState;                  // screen CAS//激活线程数+1,栈顶状态工作线程以前的状态long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);//原子更新ctl, 如果更新不成功说明被其他线程激活了或者阻塞超时了if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {//更新工作线程带新版本的状态v.scanState = vs;                      // activate v//如果阻塞调用者不为空,则唤醒工作线程,并退出if ((p = v.parker) != null)U.unpark(p);break;}//如果队列,是没有数据,则退出if (q != null && q.base == q.top)          // no more workbreak;}}

增加工作线程tryAddWorker

//尝试增加一个工作线程

  private void tryAddWorker(long c) {boolean add = false;do {long nc = ((AC_MASK & (c + AC_UNIT)) | //激活线程数量+1(TC_MASK & (c + TC_UNIT))); //工作线程总数+1//ctl 没有变化if (ctl == c) {int rs, stop;                 // check if terminating//加锁,并判断线程池是否停止

这篇关于ForkJoin 框架 源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/841118

相关文章

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

在C#中合并和解析相对路径方式

《在C#中合并和解析相对路径方式》Path类提供了几个用于操作文件路径的静态方法,其中包括Combine方法和GetFullPath方法,Combine方法将两个路径合并在一起,但不会解析包含相对元素... 目录C#合并和解析相对路径System.IO.Path类幸运的是总结C#合并和解析相对路径对于 C

Java解析JSON的六种方案

《Java解析JSON的六种方案》这篇文章介绍了6种JSON解析方案,包括Jackson、Gson、FastJSON、JsonPath、、手动解析,分别阐述了它们的功能特点、代码示例、高级功能、优缺点... 目录前言1. 使用 Jackson:业界标配功能特点代码示例高级功能优缺点2. 使用 Gson:轻量

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

MyBatis框架实现一个简单的数据查询操作

《MyBatis框架实现一个简单的数据查询操作》本文介绍了MyBatis框架下进行数据查询操作的详细步骤,括创建实体类、编写SQL标签、配置Mapper、开启驼峰命名映射以及执行SQL语句等,感兴趣的... 基于在前面几章我们已经学习了对MyBATis进行环境配置,并利用SqlSessionFactory核

python解析HTML并提取span标签中的文本

《python解析HTML并提取span标签中的文本》在网页开发和数据抓取过程中,我们经常需要从HTML页面中提取信息,尤其是span元素中的文本,span标签是一个行内元素,通常用于包装一小段文本或... 目录一、安装相关依赖二、html 页面结构三、使用 BeautifulSoup javascript

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL