ThreadPoolExecutor的实现机制

2024-05-28 17:32

本文主要是介绍ThreadPoolExecutor的实现机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文地址:《ThreadPoolExecutor的实现机制》
1、什么是ThreadPoolExecutor
ThreadPoolExecutor是一个 ExecutorService一个具体实现,在实际项目中,主要使用ThreadPoolExecutor维护的线程队列中的任意一个空闲线程去执行每个提交任务。说的直白点就是在实际项目中,没有办法为每个提交的任务立马分配一个线程,所以在程序中维护一个数量一定的线程集合来应对提交的任务。ThreadPoolExecutor是对指定数量线程资源和待处理任务队列的维护。

2、为什么要使用ThreadPoolExecutor
ThreadPoolExecutor(线程池)主要有一些优势:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
3、ThreadPoolExecutor具体实现
ThreadPoolExecutor提交任务的大致流程图如下:

如上所示的流程图有一点需要说明,在添加任务到任务队列如果出现失败,会当成当前任务队列已满情况来处理,尝试创建新线程来执行任务。流程中的具体的实现代码如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn’t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在使用execute方法执行任务时,首先通过ctl获取当前的值,ctl是一个AtomicInteger对象,初始值为-536870912,通过workerCountOf确定当前线程池中的线程数。

线程数小于线程池的核心线程数,则执行创建线程并执行任务的流程(addWorker(command, true)),流程代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

在 C 值没有超过0时,workerCountOf(int C)函数返回的值为(C-(-RUNNING)),runStateOf(int C)函数的返回值为min(C,RUNNING),所以在第一个for循环中,程序不会进入if语句中,在第二个for循环中,由于当前函数传入的core为true,表明添加的线程为核心线程,程序值判断当前线程数小于核心线程数,ctl自增1,并跳出for循环开始创建工作线程。这里使用了ReentrantLock进行了加锁操作。在整个锁部分主要执行创建工作线程、添加工作线程到线程集合中、更新线程池当前线程数、工作线程启动执行任务,返回当前添加线程是否成功,成功直接返回退出execute方法。

当前线程数大于等于线程池核心线程数时,添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
isRunning方法判断当前入参是否小于0,条件显然时满足的,接着执行workQueue.offer(command)操作,将当前任务添加至线程池的任务队列中,后续的if条件都不满足,整个execute的调用只是将任务添加到线程池的任务队列中。

在任务队列已满的情况下添加任务队列, 添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
由于任务队列已经满了,所以添加任务到任务队列中失败,返回false,执行else if语句,调用addWorker(command, false)方法,执行相关流程,流程实现代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;

        // Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;

}
判断当前线程池的wc即线程数是否大于最大线程数,如果不大于则之间更新当前线程池的线程数量,然后创建新的工作线程,执行任务。

在线程池任务队列已满,并且线程数已经达到线程池的最大线程数时,再次往线程池提交任务,线程池会根据线程池设置的拒绝策略来处理任务,即addWorker方法返回false,在execute中执行reject(command)方法,具体代码如下:

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
直接使用初始化线程池设置的handler来处理任务。

工作线程如何从任务队列中获取任务,每个工作线程都会执行自己的run方法,具体的方法实现如下:

public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在代码中存在while (task != null || (task = getTask()) != null)语句,这里只有当task为空时,才会正常结束当前线程。在整个任务执行的过程中,存在连个hook函数,beforeExecute和afterExecute通过方法的重写可以在方法中对task信息进行预处理等。

4、 关于ThreadPoolExecutor的总结
在向线程池提交任务时,可能会触发创建新的工作线程。如果是在高并发的情况下会不会造成创建超过线程池规定的数量的线程。因为在代码中通过ctl获取值,在到ctl自增更新线程数量的过程可能会有其他的线程也执行到了这段代码,都会通过线程数量限制条件的判断,最终都会创建新线程执行任务,具体代码片段如下:
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;
//假设线程A和线程B同时执行这段代码,其中当前的线程池数量为最
//大线程数-1,但是线程A和线程B进行线程数量判断时都可以通过,
//随后都会进入新建工作线程执行任务的流程。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

        for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;

线程池并不能节省任务执行的时间,使用线程池只能节省线程的创建和回收时间。
如果项目中需要频繁创建任务来处理大量的短耗时任务,那么使用线程池是一个不错的选择。如果是非常耗时的任务,可能在初始化线程池的时候需要计算好线程池的核心线程数量以防止任务长时间得不到执行。

这篇关于ThreadPoolExecutor的实现机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1011218

相关文章

C++对象布局及多态实现探索之内存布局(整理的很多链接)

本文通过观察对象的内存布局,跟踪函数调用的汇编代码。分析了C++对象内存的布局情况,虚函数的执行方式,以及虚继承,等等 文章链接:http://dev.yesky.com/254/2191254.shtml      论C/C++函数间动态内存的传递 (2005-07-30)   当你涉及到C/C++的核心编程的时候,你会无止境地与内存管理打交道。 文章链接:http://dev.yesky

通过SSH隧道实现通过远程服务器上外网

搭建隧道 autossh -M 0 -f -D 1080 -C -N user1@remotehost##验证隧道是否生效,查看1080端口是否启动netstat -tuln | grep 1080## 测试ssh 隧道是否生效curl -x socks5h://127.0.0.1:1080 -I http://www.github.com 将autossh 设置为服务,隧道开机启动

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测

时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测 目录 时序预测 | MATLAB实现LSTM时间序列未来多步预测-递归预测基本介绍程序设计参考资料 基本介绍 MATLAB实现LSTM时间序列未来多步预测-递归预测。LSTM是一种含有LSTM区块(blocks)或其他的一种类神经网络,文献或其他资料中LSTM区块可能被描述成智能网络单元,因为

vue项目集成CanvasEditor实现Word在线编辑器

CanvasEditor实现Word在线编辑器 官网文档:https://hufe.club/canvas-editor-docs/guide/schema.html 源码地址:https://github.com/Hufe921/canvas-editor 前提声明: 由于CanvasEditor目前不支持vue、react 等框架开箱即用版,所以需要我们去Git下载源码,拿到其中两个主

android一键分享功能部分实现

为什么叫做部分实现呢,其实是我只实现一部分的分享。如新浪微博,那还有没去实现的是微信分享。还有一部分奇怪的问题:我QQ分享跟QQ空间的分享功能,我都没配置key那些都是原本集成就有的key也可以实现分享,谁清楚的麻烦详解下。 实现分享功能我们可以去www.mob.com这个网站集成。免费的,而且还有短信验证功能。等这分享研究完后就研究下短信验证功能。 开始实现步骤(新浪分享,以下是本人自己实现

基于Springboot + vue 的抗疫物质管理系统的设计与实现

目录 📚 前言 📑摘要 📑系统流程 📚 系统架构设计 📚 数据库设计 📚 系统功能的具体实现    💬 系统登录注册 系统登录 登录界面   用户添加  💬 抗疫列表展示模块     区域信息管理 添加物资详情 抗疫物资列表展示 抗疫物资申请 抗疫物资审核 ✒️ 源码实现 💖 源码获取 😁 联系方式 📚 前言 📑博客主页:

探索蓝牙协议的奥秘:用ESP32实现高质量蓝牙音频传输

蓝牙(Bluetooth)是一种短距离无线通信技术,广泛应用于各种电子设备之间的数据传输。自1994年由爱立信公司首次提出以来,蓝牙技术已经经历了多个版本的更新和改进。本文将详细介绍蓝牙协议,并通过一个具体的项目——使用ESP32实现蓝牙音频传输,来展示蓝牙协议的实际应用及其优点。 蓝牙协议概述 蓝牙协议栈 蓝牙协议栈是蓝牙技术的核心,定义了蓝牙设备之间如何进行通信。蓝牙协议

Linux系统稳定性的奥秘:探究其背后的机制与哲学

在计算机操作系统的世界里,Linux以其卓越的稳定性和可靠性著称,成为服务器、嵌入式系统乃至个人电脑用户的首选。那么,是什么造就了Linux如此之高的稳定性呢?本文将深入解析Linux系统稳定性的几个关键因素,揭示其背后的技术哲学与实践。 1. 开源协作的力量Linux是一个开源项目,意味着任何人都可以查看、修改和贡献其源代码。这种开放性吸引了全球成千上万的开发者参与到内核的维护与优化中,形成了

python实现最简单循环神经网络(RNNs)

Recurrent Neural Networks(RNNs) 的模型: 上图中红色部分是输入向量。文本、单词、数据都是输入,在网络里都以向量的形式进行表示。 绿色部分是隐藏向量。是加工处理过程。 蓝色部分是输出向量。 python代码表示如下: rnn = RNN()y = rnn.step(x) # x为输入向量,y为输出向量 RNNs神经网络由神经元组成, python

Spring中事务的传播机制

一、前言 首先事务传播机制解决了什么问题 Spring 事务传播机制是包含多个事务的方法在相互调用时,事务是如何在这些方法间传播的。 事务的传播级别有 7 个,支持当前事务的:REQUIRED、SUPPORTS、MANDATORY; 不支持当前事务的:REQUIRES_NEW、NOT_SUPPORTED、NEVER,以及嵌套事务 NESTED,其中 REQUIRED 是默认的事务传播级别。