本文主要是介绍ForkJoin 框架 源码解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
ForkJoin 框架
work-stealing框架规则
框架规则:
I.每一个工作线程都维护自己调度队列中的可运行任务。
II.调度队列支持队列和栈二项性,既支持LIFO(后进先出,栈模式),又支持FIFO(先进先出, 队列模式)。
III.工作线程处理任务时,fork产生的子任务,将会放入到自己的调度队列中。
IV.工作线程使用LIFO(栈模式),来处理自身调度队列中的任务。
V.当某个工作线程的调度队列无任务时,随机一个别的调度队列,通过FIFO模式(先进先出, 队列模式), 取出(窃取)一个任务去运行。
VI.当一个工作线程调用了join操作,被join的目标任务如果没有完成,线程可能偷取其他任务进行处理,直到目标任务已经结束(通过isDone方法)。所有的任务都会无阻塞的完成。
VII.当一个工作线程本地队列没有任务处理,同时也无法从线程池中的其他队列中偷取到任务时,它就会退出扫描(通过yield、sleep和/或者优先级调整)一段时间,超时后再度尝试,除非所有的工作线程都处于空闲的状态。在这种情况下,他们都会阻塞直到新的任务加入。
补充:每个工作线程都会有自己的调度队列,但调度队列也可能没有工作线程
基本算法
protected long compute() {if (end - start <= THRESHOLD) {return doResult();} else {high.fork();low.fork();return low.join() + high.join();}
}
框架说明
Forkjoin框架主要是采用分治思想来做处理,把大任务切分成若干小任务,然后小任务再切成更小的任务,以此递归,直到切到最小单元处理任务为止。任务处理上线程采用work-stealing规则进行处理。简单的任务切分模式如下图所示:
框架组成
I. ForkJoinPool: 线程池类,管理任务队列,队列数组中,奇数索引位是带有工作线程的任务队列,偶数索引位(64个槽位)是不带工作线程的共享队列。提供了任务提交入口,和任务偷取框架核心运行支持。
II. ForkJoinPool.WorkQueue: 任务环形双端队列,实现了队列和栈的二项性。队列采用cas原子操作实现多线程安全。
III.ForkJoinWorkThread: 工作线程,实现了向线程池奇数索引注册工作队列。
IV. ForkJoinTask: 任务抽象类,提供了fork和join操作接口,子类有三个,分别是:
1.RecursiveAction,递归无返回结果的任务, compute拆分子任务
2.RecursiveTask, 递归有返回结果的任务,compute拆分子任务
3.CountedCompleter, 一个特殊的计数任务类,类中维护着子任务计数器pending和父任务指针,这样就形成了,从根任务到子任务一个树形结构,每个结点都有孩子的计数器pending,一个任务完成后会调用完成接口, 如果子任务数pending递减为0,会回调任务完成接口和再递归调用父任务的pending递减操作,然后再判断父任务的pending是否递减到0…,就这样递归调用,直到根任务pending递减到0,则此根任务下的所有任务完成。此类任务,ForkJoin框架中作了特殊处理,任务的运行偷取都做了相对的特殊处理,因此在运行性能上会更高。
主要常量和成员变量
// Bounds/* 如果并发线程数是MAX_CAP=0x7fff, 则队列数组的最大长度是0x10000*,所提供的偶数索引槽数:0x8000, 奇数索引槽数:0x8000, 所有的最大非工作线程的队列有槽位:64 < 0x8000,* 最大并发工作线程队列所在奇数槽位:0x7fff < 0x8000, *所以队列数组不会超过SMASK = 0xffff*/static final int SMASK = 0xffff; // short bits == max index//即最大的工作线程并发数static final int MAX_CAP = 0x7fff; // max #workers - 1//队列数组索引的偶数掩码static final int EVENMASK = 0xfffe; // even short bits//最大64个偶数槽位,用来非工作线程队列的偶数索引位static final int SQMASK = 0x007e; // max 64 (even) slots// Masks and units for WorkQueue.scanState and ctl sp subfield//任务被运行处理的时候,会标记此状态,表示任务正在扫描static final int SCANNING = 1; // false when running tasks//任务处于空闲状态,未被激活static final int INACTIVE = 1 << 31; // must be negative//ctl 低16位SS(栈顶队列池化索引)的版本号(防止CAS的ABA问题)static final int SS_SEQ = 1 << 16; // version count// Mode bits for ForkJoinPool.config and WorkQueue.config//int低16位掩码static final int MODE_MASK = 0xffff << 16; // top half of int//后进先出,栈模式static final int LIFO_QUEUE = 0;//先进先出,队列模式static final int FIFO_QUEUE = 1 << 16;//共享队列模式
static final int SHARED_QUEUE = 1 << 31; // must be negative//ID SS, ctl 低32位掩码private static final long SP_MASK = 0xffffffffL;//UC, 高32位掩码private static final long UC_MASK = ~SP_MASK;// Active counts//活动的工作线程移位操作private static final int AC_SHIFT = 48;//活跃的线程 高16位+1增量单位private static final long AC_UNIT = 0x0001L << AC_SHIFT;//活跃线程AC,高16位掩码private static final long AC_MASK = 0xffffL << AC_SHIFT;// Total counts//总的工作线程TC,移位private static final int TC_SHIFT = 32;//TC增量单位+1,高(32-48)位private static final long TC_UNIT = 0x0001L << TC_SHIFT;//tc, 32-48掩码private static final long TC_MASK = 0xffffL << TC_SHIFT;//增加工作线程的位运算,最高位1,如果AC最高位1,则为负数未达到并发线程量,否则为0,则达到最大并发量private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);// runState bits: SHUTDOWN must be negative, others arbitrary powers of two//lockprivate static final int RSLOCK = 1;//通知状态private static final int RSIGNAL = 1 << 1;//开始状态private static final int STARTED = 1 << 2;//停止状态private static final int STOP = 1 << 29;//终止状态private static final int TERMINATED = 1 << 30;//关闭状态private static final int SHUTDOWN = 1 << 31;// Instance fields//池控制,AC (48-64), TC(32-48), SS(16-32), ID(0-16)
volatile long ctl; // main pool control
//线程池运行状态
volatile int runState; // lockable status
//配置,并行度和队列模式进行逻辑或运算组合
final int config; // parallelism, mode
//随机种子
int indexSeed; // to generate worker index
//线程池的工作队列数组,奇数索引放有工作线程的队列,偶数索引(64槽位)共享工作队列
volatile WorkQueue[] workQueues; // main registry
//工作线程工厂类
final ForkJoinWorkerThreadFactory factory;
//每个线程的异常处理
final UncaughtExceptionHandler ueh; // per-worker UEH
//创建工作线程的名字前缀
final String workerNamePrefix; // to create worker name string
//所有工作线程偷取任务数计数器
volatile AtomicLong stealCounter; // also used as sync monitor
ctl 每个16位字段意义:
ac 高位(48-64) | tc 高位(32-48) | ss 低位(16-32) | id 低位(0-16) |
---|---|---|---|
0000000000000000 | 0000000000000000 | 0000000000000000 | 0000000000000000 |
ctl池控制长整形表示,总共有64位,从低位到高位每16位为一个字段,共分成4个字段,这个4个字段的意义如下:
AC: 活动线程数量,线程池构造初始化是负的最大并行度。当有新的工作线程加入或者激活时,就会 +1,直到ac加到0时,激活线程达到最大并行度了。设计成负数的好处是判断ctl是负值时,就说明活动线程ac没有达到饱和,还可以继续增加或者激活工作线程。
TC:总工作线程数量,线程池构造初始化是负的最大并行度。当有新的工作线程加入或者激活时,就会 +1,直到tc加到0时,总线程数达到最大并行度了。一般情况下ac值小于等于tc.因为在没有任务处理的情况下,可能总工作线程不变,激活的线程会变成空闲线程,这时ac值就会变小了。
SS:栈顶等待线程(最后一个处于空闲状态的等待线程)的版本号和状态,版本的控制是16-31位,每次版本变更做+1,栈顶线程状态是第32位来控制,当第32位为1时,表示空闲,0时表示激活。
ID: 栈顶等待线程在线程池中的工作队列数组索引。
ForkJoinPool说明
1.工作线程任务队列:此类型队列拥有一个工作线程,此类型队列任务只能是自身工作线程的fork任务。
2. 共享任务队列:此类型队列没有工作线程,此类型的队列任务是由外部程序提交的任务。
3. 线程池任务队列数组索引规则:偶数位是共享任务队列,共享任务队列只有64个偶数索引槽位。奇数索引是工作线程任务队列。
4. 任务扫描(偷取FIFO模式): 工作线程会生成一个随机开始索引,从池队列数组中开始扫描任务队列(包括自己的队列),a. 扫描到有效的任务,则开始处理任务,任务处理完成后,则开始处理自己内部队列的中的全部任务(LIFO),内部队列任务处理完成后。重新继续扫描任务。b. 未扫描到有效任务,则工作线程设置成空闲状态,并进行等待,如果等待成功,则进行park, 有新任务进入后,会被unpark唤醒,否则工作线程会进入销毁流程。
5. 任务提交:外部任务提交时会push到偶数索引位的共享队列中,任务处理过程中有compute产生的子任务,则子任务会被fork到当前工作线程任务队列中。
6.计数任务处理:这是一个特殊的任务。线程池有针对此任务会做特殊处理。任务awaitJoin时对计数任务调用帮助完成任务时,会帮助完成任务结点下的所有子任务。
7. 帮助偷取处理任务:任务在doJoin时,可能会调到awaitJoin, awaitJoin中,任务如果是 a.计算任务则会调用帮助完成任务(参考6). b.普通任务, 则会调用帮助偷取任务处理,以免当前工作线程空转等待任务完成,这样担任工作线程的利用率,提升了任务并发处理效率。
框架示意图:
源码说明
外部提交任务externalPush
//外部提交任务,此任务会提交到64个偶数槽中的任务队列其中一个,线程探针是通过提交线程生成的,
//通过线程探针生成队列数组索引,如果该索引下有队列则push到队列中,否则调用externalSubmit(), 即走完队列初始化全部流程加入到任务队列中
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;//获取当前线程的探针int r = ThreadLocalRandom.getProbe();//线程池运行状态int rs = runState;//线程池队列数组有效if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&//根据r,获取SQMASK 掩码64个槽位中的r位索引,判断索引位队列是否有效(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&//队列加锁U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;//判断队列中的任务数是否满了if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {//top偏移位int j = ((am & s) << ASHIFT) + ABASE;//放入任务到队列中U.putOrderedObject(a, j, task);//top指针加1U.putOrderedInt(q, QTOP, s + 1);//unlockU.putIntVolatile(q, QLOCK, 0);//任务数=1,如果正好任务被某个工作线程偷走,其他线程扫描时没扫描到处于空闲等待状态,//<1可能其他线程都在空闲等待中,所以这时通知空闲线程有任务要处理了if (n <= 1)signalWork(ws, q);return;}//unlockU.compareAndSwapInt(q, QLOCK, 1, 0);}externalSubmit(task);
}
外部提交任务externalSubmit
//外部任务提交初始化一套流程,初始线程池任务队列数组,初始化新的偶数槽位(最大64个槽位)队列//初始化完成后,如果提交任务成功,则通知工作线程处理任务。private void externalSubmit(ForkJoinTask<?> task) {int r; // initialize caller's probe//调用者线程探针初始化,然后再取随机探针if ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;boolean move = false;//线程池被终止了(SHUTDOWN), 帮助关闭if ((rs = runState) < 0) {tryTerminate(false, false); // help terminatethrow new RejectedExecutionException();}//如果运行状态未开始,或者池化队列数组未初始化,则进行初始化else if ((rs & STARTED) == 0 || // initialize((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;//lockrs = lockRunState();try {//如果运行状态未开始if ((rs & STARTED) == 0) {//初始化偷取计数U.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// create workQueues array with size a power of two//并行工作线程数int p = config & SMASK; // ensure at least 2 slots//并行数超出1,取 -1,否则取1int n = (p > 1) ? p - 1 : 1;//保证2的幂次方,这样就可以通过位运行取模n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;//new 队列数组workQueues = new WorkQueue[n];ns = STARTED;}} finally {//unlock 并加上运行初始化状态unlockRunState(rs, (rs & ~RSLOCK) | ns);}}//和SQMASK逻辑运算后,结果会变成偶数,只有64个糟位, 根据探针r取偶数索引位的队列else if ((q = ws[k = r & m & SQMASK]) != null) {//队列lockif (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;int s = q.top;boolean submitted = false; // initial submission or resizingtry { // locked version of push//队列任务数未满if ((a != null && a.length > s + 1 - q.base) ||//或者队列扩容成功(a = q.growArray()) != null) {//top在数组中的偏移量int j = (((a.length - 1) & s) << ASHIFT) + ABASE;//放入队列U.putOrderedObject(a, j, task);//top + 1U.putOrderedInt(q, QTOP, s + 1);submitted = true;}} finally {//unlockU.compareAndSwapInt(q, QLOCK, 1, 0);}//任务提交成功,通知工作线程处理任务if (submitted) {signalWork(ws, q);return;}}move = true; // move on failure}//如果线程池运行状态未被锁,创建新任务队列else if (((rs = runState) & RSLOCK) == 0) { // create new queueq = new WorkQueue(this, null);//记录探针值,q.hint = r;//配置成共享队列,所有的工作线程都可以偷取处理,此队列在偶数位,没有被工作线程拥有q.config = k | SHARED_QUEUE;//队列状态标记为未激活q.scanState = INACTIVE;//lockrs = lockRunState(); // publish indexif (rs > 0 && (ws = workQueues) != null &&k < ws.length && ws[k] == null)//队列放入线程池队列数组中ws[k] = q; // else terminated//unlockunlockRunState(rs, rs & ~RSLOCK);}elsemove = true; // move if busyif (move)r = ThreadLocalRandom.advanceProbe(r);}}
通知工作线程处理任务signalWork
//通知工作线程处理任务,如果工作线程未达到并行度,则创建新的工作线程。
//否则查看ctl栈顶处于空闲状态的工作线程,如果有,则唤醒
final void signalWork(WorkQueue[] ws, WorkQueue q) {long c; int sp, i; WorkQueue v; Thread p;//激活的线程量未达到总并发量while ((c = ctl) < 0L) { // too few active//ctl栈顶未有waitersif ((sp = (int)c) == 0) { // no idle workers//ctl 最高位1,<0,未达到并发量if ((c & ADD_WORKER) != 0L) // too few workers//增加工作线程tryAddWorker(c);break;}//工作线程数组为空,退出if (ws == null) // unstarted/terminatedbreak;//ctl栈顶waiter索引,超出了工作线程数组长度,?if (ws.length <= (i = sp & SMASK)) // terminatedbreak;//此索引的工作线程被撤销停止了if ((v = ws[i]) == null) // terminatingbreak;//增加ctl栈顶工作线程的版本号,并设置激活状态int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState//ctl栈顶状态和工作线程状态比较int d = sp - v.scanState; // screen CAS//激活线程数+1,栈顶状态工作线程以前的状态long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);//原子更新ctl, 如果更新不成功说明被其他线程激活了或者阻塞超时了if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {//更新工作线程带新版本的状态v.scanState = vs; // activate v//如果阻塞调用者不为空,则唤醒工作线程,并退出if ((p = v.parker) != null)U.unpark(p);break;}//如果队列,是没有数据,则退出if (q != null && q.base == q.top) // no more workbreak;}}
增加工作线程tryAddWorker
//尝试增加一个工作线程
private void tryAddWorker(long c) {boolean add = false;do {long nc = ((AC_MASK & (c + AC_UNIT)) | //激活线程数量+1(TC_MASK & (c + TC_UNIT))); //工作线程总数+1//ctl 没有变化if (ctl == c) {int rs, stop; // check if terminating//加锁,并判断线程池是否停止
这篇关于ForkJoin 框架 源码解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!