ThreadPoolExecutor的实现机制

2024-05-28 17:32

本文主要是介绍ThreadPoolExecutor的实现机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文地址:《ThreadPoolExecutor的实现机制》
1、什么是ThreadPoolExecutor
ThreadPoolExecutor是一个 ExecutorService一个具体实现,在实际项目中,主要使用ThreadPoolExecutor维护的线程队列中的任意一个空闲线程去执行每个提交任务。说的直白点就是在实际项目中,没有办法为每个提交的任务立马分配一个线程,所以在程序中维护一个数量一定的线程集合来应对提交的任务。ThreadPoolExecutor是对指定数量线程资源和待处理任务队列的维护。

2、为什么要使用ThreadPoolExecutor
ThreadPoolExecutor(线程池)主要有一些优势:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
3、ThreadPoolExecutor具体实现
ThreadPoolExecutor提交任务的大致流程图如下:

如上所示的流程图有一点需要说明,在添加任务到任务队列如果出现失败,会当成当前任务队列已满情况来处理,尝试创建新线程来执行任务。流程中的具体的实现代码如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn’t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在使用execute方法执行任务时,首先通过ctl获取当前的值,ctl是一个AtomicInteger对象,初始值为-536870912,通过workerCountOf确定当前线程池中的线程数。

线程数小于线程池的核心线程数,则执行创建线程并执行任务的流程(addWorker(command, true)),流程代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

在 C 值没有超过0时,workerCountOf(int C)函数返回的值为(C-(-RUNNING)),runStateOf(int C)函数的返回值为min(C,RUNNING),所以在第一个for循环中,程序不会进入if语句中,在第二个for循环中,由于当前函数传入的core为true,表明添加的线程为核心线程,程序值判断当前线程数小于核心线程数,ctl自增1,并跳出for循环开始创建工作线程。这里使用了ReentrantLock进行了加锁操作。在整个锁部分主要执行创建工作线程、添加工作线程到线程集合中、更新线程池当前线程数、工作线程启动执行任务,返回当前添加线程是否成功,成功直接返回退出execute方法。

当前线程数大于等于线程池核心线程数时,添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
isRunning方法判断当前入参是否小于0,条件显然时满足的,接着执行workQueue.offer(command)操作,将当前任务添加至线程池的任务队列中,后续的if条件都不满足,整个execute的调用只是将任务添加到线程池的任务队列中。

在任务队列已满的情况下添加任务队列, 添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
由于任务队列已经满了,所以添加任务到任务队列中失败,返回false,执行else if语句,调用addWorker(command, false)方法,执行相关流程,流程实现代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;

        // Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;

}
判断当前线程池的wc即线程数是否大于最大线程数,如果不大于则之间更新当前线程池的线程数量,然后创建新的工作线程,执行任务。

在线程池任务队列已满,并且线程数已经达到线程池的最大线程数时,再次往线程池提交任务,线程池会根据线程池设置的拒绝策略来处理任务,即addWorker方法返回false,在execute中执行reject(command)方法,具体代码如下:

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
直接使用初始化线程池设置的handler来处理任务。

工作线程如何从任务队列中获取任务,每个工作线程都会执行自己的run方法,具体的方法实现如下:

public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在代码中存在while (task != null || (task = getTask()) != null)语句,这里只有当task为空时,才会正常结束当前线程。在整个任务执行的过程中,存在连个hook函数,beforeExecute和afterExecute通过方法的重写可以在方法中对task信息进行预处理等。

4、 关于ThreadPoolExecutor的总结
在向线程池提交任务时,可能会触发创建新的工作线程。如果是在高并发的情况下会不会造成创建超过线程池规定的数量的线程。因为在代码中通过ctl获取值,在到ctl自增更新线程数量的过程可能会有其他的线程也执行到了这段代码,都会通过线程数量限制条件的判断,最终都会创建新线程执行任务,具体代码片段如下:
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;
//假设线程A和线程B同时执行这段代码,其中当前的线程池数量为最
//大线程数-1,但是线程A和线程B进行线程数量判断时都可以通过,
//随后都会进入新建工作线程执行任务的流程。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

        for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;

线程池并不能节省任务执行的时间,使用线程池只能节省线程的创建和回收时间。
如果项目中需要频繁创建任务来处理大量的短耗时任务,那么使用线程池是一个不错的选择。如果是非常耗时的任务,可能在初始化线程池的时候需要计算好线程池的核心线程数量以防止任务长时间得不到执行。

这篇关于ThreadPoolExecutor的实现机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1011218

相关文章

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

浏览器插件cursor实现自动注册、续杯的详细过程

《浏览器插件cursor实现自动注册、续杯的详细过程》Cursor简易注册助手脚本通过自动化邮箱填写和验证码获取流程,大大简化了Cursor的注册过程,它不仅提高了注册效率,还通过友好的用户界面和详细... 目录前言功能概述使用方法安装脚本使用流程邮箱输入页面验证码页面实战演示技术实现核心功能实现1. 随机