ThreadPoolExecutor的实现机制

2024-05-28 17:32

本文主要是介绍ThreadPoolExecutor的实现机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文地址:《ThreadPoolExecutor的实现机制》
1、什么是ThreadPoolExecutor
ThreadPoolExecutor是一个 ExecutorService一个具体实现,在实际项目中,主要使用ThreadPoolExecutor维护的线程队列中的任意一个空闲线程去执行每个提交任务。说的直白点就是在实际项目中,没有办法为每个提交的任务立马分配一个线程,所以在程序中维护一个数量一定的线程集合来应对提交的任务。ThreadPoolExecutor是对指定数量线程资源和待处理任务队列的维护。

2、为什么要使用ThreadPoolExecutor
ThreadPoolExecutor(线程池)主要有一些优势:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
3、ThreadPoolExecutor具体实现
ThreadPoolExecutor提交任务的大致流程图如下:

如上所示的流程图有一点需要说明,在添加任务到任务队列如果出现失败,会当成当前任务队列已满情况来处理,尝试创建新线程来执行任务。流程中的具体的实现代码如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn’t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在使用execute方法执行任务时,首先通过ctl获取当前的值,ctl是一个AtomicInteger对象,初始值为-536870912,通过workerCountOf确定当前线程池中的线程数。

线程数小于线程池的核心线程数,则执行创建线程并执行任务的流程(addWorker(command, true)),流程代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

在 C 值没有超过0时,workerCountOf(int C)函数返回的值为(C-(-RUNNING)),runStateOf(int C)函数的返回值为min(C,RUNNING),所以在第一个for循环中,程序不会进入if语句中,在第二个for循环中,由于当前函数传入的core为true,表明添加的线程为核心线程,程序值判断当前线程数小于核心线程数,ctl自增1,并跳出for循环开始创建工作线程。这里使用了ReentrantLock进行了加锁操作。在整个锁部分主要执行创建工作线程、添加工作线程到线程集合中、更新线程池当前线程数、工作线程启动执行任务,返回当前添加线程是否成功,成功直接返回退出execute方法。

当前线程数大于等于线程池核心线程数时,添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
isRunning方法判断当前入参是否小于0,条件显然时满足的,接着执行workQueue.offer(command)操作,将当前任务添加至线程池的任务队列中,后续的if条件都不满足,整个execute的调用只是将任务添加到线程池的任务队列中。

在任务队列已满的情况下添加任务队列, 添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
由于任务队列已经满了,所以添加任务到任务队列中失败,返回false,执行else if语句,调用addWorker(command, false)方法,执行相关流程,流程实现代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;

        // Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;

}
判断当前线程池的wc即线程数是否大于最大线程数,如果不大于则之间更新当前线程池的线程数量,然后创建新的工作线程,执行任务。

在线程池任务队列已满,并且线程数已经达到线程池的最大线程数时,再次往线程池提交任务,线程池会根据线程池设置的拒绝策略来处理任务,即addWorker方法返回false,在execute中执行reject(command)方法,具体代码如下:

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
直接使用初始化线程池设置的handler来处理任务。

工作线程如何从任务队列中获取任务,每个工作线程都会执行自己的run方法,具体的方法实现如下:

public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在代码中存在while (task != null || (task = getTask()) != null)语句,这里只有当task为空时,才会正常结束当前线程。在整个任务执行的过程中,存在连个hook函数,beforeExecute和afterExecute通过方法的重写可以在方法中对task信息进行预处理等。

4、 关于ThreadPoolExecutor的总结
在向线程池提交任务时,可能会触发创建新的工作线程。如果是在高并发的情况下会不会造成创建超过线程池规定的数量的线程。因为在代码中通过ctl获取值,在到ctl自增更新线程数量的过程可能会有其他的线程也执行到了这段代码,都会通过线程数量限制条件的判断,最终都会创建新线程执行任务,具体代码片段如下:
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;
//假设线程A和线程B同时执行这段代码,其中当前的线程池数量为最
//大线程数-1,但是线程A和线程B进行线程数量判断时都可以通过,
//随后都会进入新建工作线程执行任务的流程。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

        for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;

线程池并不能节省任务执行的时间,使用线程池只能节省线程的创建和回收时间。
如果项目中需要频繁创建任务来处理大量的短耗时任务,那么使用线程池是一个不错的选择。如果是非常耗时的任务,可能在初始化线程池的时候需要计算好线程池的核心线程数量以防止任务长时间得不到执行。

这篇关于ThreadPoolExecutor的实现机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1011218

相关文章

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

Nginx部署HTTP/3的实现步骤

《Nginx部署HTTP/3的实现步骤》本文介绍了在Nginx中部署HTTP/3的详细步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录前提条件第一步:安装必要的依赖库第二步:获取并构建 BoringSSL第三步:获取 Nginx

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja