ThreadPoolExecutor的实现机制

2024-05-28 17:32

本文主要是介绍ThreadPoolExecutor的实现机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文地址:《ThreadPoolExecutor的实现机制》
1、什么是ThreadPoolExecutor
ThreadPoolExecutor是一个 ExecutorService一个具体实现,在实际项目中,主要使用ThreadPoolExecutor维护的线程队列中的任意一个空闲线程去执行每个提交任务。说的直白点就是在实际项目中,没有办法为每个提交的任务立马分配一个线程,所以在程序中维护一个数量一定的线程集合来应对提交的任务。ThreadPoolExecutor是对指定数量线程资源和待处理任务队列的维护。

2、为什么要使用ThreadPoolExecutor
ThreadPoolExecutor(线程池)主要有一些优势:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
3、ThreadPoolExecutor具体实现
ThreadPoolExecutor提交任务的大致流程图如下:

如上所示的流程图有一点需要说明,在添加任务到任务队列如果出现失败,会当成当前任务队列已满情况来处理,尝试创建新线程来执行任务。流程中的具体的实现代码如下:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn’t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf© < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在使用execute方法执行任务时,首先通过ctl获取当前的值,ctl是一个AtomicInteger对象,初始值为-536870912,通过workerCountOf确定当前线程池中的线程数。

线程数小于线程池的核心线程数,则执行创建线程并执行任务的流程(addWorker(command, true)),流程代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

在 C 值没有超过0时,workerCountOf(int C)函数返回的值为(C-(-RUNNING)),runStateOf(int C)函数的返回值为min(C,RUNNING),所以在第一个for循环中,程序不会进入if语句中,在第二个for循环中,由于当前函数传入的core为true,表明添加的线程为核心线程,程序值判断当前线程数小于核心线程数,ctl自增1,并跳出for循环开始创建工作线程。这里使用了ReentrantLock进行了加锁操作。在整个锁部分主要执行创建工作线程、添加工作线程到线程集合中、更新线程池当前线程数、工作线程启动执行任务,返回当前添加线程是否成功,成功直接返回退出execute方法。

当前线程数大于等于线程池核心线程数时,添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
isRunning方法判断当前入参是否小于0,条件显然时满足的,接着执行workQueue.offer(command)操作,将当前任务添加至线程池的任务队列中,后续的if条件都不满足,整个execute的调用只是将任务添加到线程池的任务队列中。

在任务队列已满的情况下添加任务队列, 添加新任务流程具体逻辑实现如下:

if (isRunning© && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
由于任务队列已经满了,所以添加任务到任务队列中失败,返回false,执行else if语句,调用addWorker(command, false)方法,执行相关流程,流程实现代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;

        // Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;

}
判断当前线程池的wc即线程数是否大于最大线程数,如果不大于则之间更新当前线程池的线程数量,然后创建新的工作线程,执行任务。

在线程池任务队列已满,并且线程数已经达到线程池的最大线程数时,再次往线程池提交任务,线程池会根据线程池设置的拒绝策略来处理任务,即addWorker方法返回false,在execute中执行reject(command)方法,具体代码如下:

final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
直接使用初始化线程池设置的handler来处理任务。

工作线程如何从任务队列中获取任务,每个工作线程都会执行自己的run方法,具体的方法实现如下:

public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在代码中存在while (task != null || (task = getTask()) != null)语句,这里只有当task为空时,才会正常结束当前线程。在整个任务执行的过程中,存在连个hook函数,beforeExecute和afterExecute通过方法的重写可以在方法中对task信息进行预处理等。

4、 关于ThreadPoolExecutor的总结
在向线程池提交任务时,可能会触发创建新的工作线程。如果是在高并发的情况下会不会造成创建超过线程池规定的数量的线程。因为在代码中通过ctl获取值,在到ctl自增更新线程数量的过程可能会有其他的线程也执行到了这段代码,都会通过线程数量限制条件的判断,最终都会创建新线程执行任务,具体代码片段如下:
retry:
for (;? {
int c = ctl.get();
int rs = runStateOf©;
//假设线程A和线程B同时执行这段代码,其中当前的线程池数量为最
//大线程数-1,但是线程A和线程B进行线程数量判断时都可以通过,
//随后都会进入新建工作线程执行任务的流程。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

        for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;

线程池并不能节省任务执行的时间,使用线程池只能节省线程的创建和回收时间。
如果项目中需要频繁创建任务来处理大量的短耗时任务,那么使用线程池是一个不错的选择。如果是非常耗时的任务,可能在初始化线程池的时候需要计算好线程池的核心线程数量以防止任务长时间得不到执行。

这篇关于ThreadPoolExecutor的实现机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1011218

相关文章

SpringBoot实现微信小程序支付功能

《SpringBoot实现微信小程序支付功能》小程序支付功能已成为众多应用的核心需求之一,本文主要介绍了SpringBoot实现微信小程序支付功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作... 目录一、引言二、准备工作(一)微信支付商户平台配置(二)Spring Boot项目搭建(三)配置文件

基于Python实现高效PPT转图片工具

《基于Python实现高效PPT转图片工具》在日常工作中,PPT是我们常用的演示工具,但有时候我们需要将PPT的内容提取为图片格式以便于展示或保存,所以本文将用Python实现PPT转PNG工具,希望... 目录1. 概述2. 功能使用2.1 安装依赖2.2 使用步骤2.3 代码实现2.4 GUI界面3.效

MySQL更新某个字段拼接固定字符串的实现

《MySQL更新某个字段拼接固定字符串的实现》在MySQL中,我们经常需要对数据库中的某个字段进行更新操作,本文就来介绍一下MySQL更新某个字段拼接固定字符串的实现,感兴趣的可以了解一下... 目录1. 查看字段当前值2. 更新字段拼接固定字符串3. 验证更新结果mysql更新某个字段拼接固定字符串 -

java实现延迟/超时/定时问题

《java实现延迟/超时/定时问题》:本文主要介绍java实现延迟/超时/定时问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java实现延迟/超时/定时java 每间隔5秒执行一次,一共执行5次然后结束scheduleAtFixedRate 和 schedu

Java Optional避免空指针异常的实现

《JavaOptional避免空指针异常的实现》空指针异常一直是困扰开发者的常见问题之一,本文主要介绍了JavaOptional避免空指针异常的实现,帮助开发者编写更健壮、可读性更高的代码,减少因... 目录一、Optional 概述二、Optional 的创建三、Optional 的常用方法四、Optio

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

Spring Boot项目中结合MyBatis实现MySQL的自动主从切换功能

《SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能》:本文主要介绍SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能,本文分步骤给大家介绍的... 目录原理解析1. mysql主从复制(Master-Slave Replication)2. 读写分离3.

Redis实现延迟任务的三种方法详解

《Redis实现延迟任务的三种方法详解》延迟任务(DelayedTask)是指在未来的某个时间点,执行相应的任务,本文为大家整理了三种常见的实现方法,感兴趣的小伙伴可以参考一下... 目录1.前言2.Redis如何实现延迟任务3.代码实现3.1. 过期键通知事件实现3.2. 使用ZSet实现延迟任务3.3

基于Python和MoviePy实现照片管理和视频合成工具

《基于Python和MoviePy实现照片管理和视频合成工具》在这篇博客中,我们将详细剖析一个基于Python的图形界面应用程序,该程序使用wxPython构建用户界面,并结合MoviePy、Pill... 目录引言项目概述代码结构分析1. 导入和依赖2. 主类:PhotoManager初始化方法:__in

springboot filter实现请求响应全链路拦截

《springbootfilter实现请求响应全链路拦截》这篇文章主要为大家详细介绍了SpringBoot如何结合Filter同时拦截请求和响应,从而实现​​日志采集自动化,感兴趣的小伙伴可以跟随小... 目录一、为什么你需要这个过滤器?​​​二、核心实现:一个Filter搞定双向数据流​​​​三、完整代码