java 最终幻想2_Java 线程池(二)

2023-10-24 06:59
文章标签 java 线程 最终幻想

本文主要是介绍java 最终幻想2_Java 线程池(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

在上篇 Java 线程池(一)我们介绍了线程池中一些的重要参数和具体含义,这篇我们看一看在 Java 中是如何去实现线程池的,要想用好线程池,只知其然是远远不够的,我们需要深入实现源码去了解线程池的具体实现细节,这样才能更好的使用到我们的工作中,当出现问题时能快速找到问题根源所在。

线程池如何处理提交的任务

我们向线程池提交任务有两种方式,分别是通过 submit 方法提交和通过 execute 方法提交,这两种方式的区别为 execute 只能提交 Runnable 类型的任务并且没有返回值,而 submit 既能提交 Runnable 类型的任务也能提交 Callable(JDK 1.5+)类型的任务并且会有一个类型 Future 的返回值,我们知道 Runnable 是没有返回值的,所以只有当提交 Callable 类型的任务时才会有返回值,而提交 Runnable 的返回值是 null。execute 执行任务时,如果此时遇到异常会直接抛出,而 submit 不会直接抛出,只有在使用 Future 的 get 方法获取任务的返回结果时,才会抛出异常。通过查看 ThreadPoolExecutor 的源码我们发现,其 submit 方法是继承自其抽象父类 AbstractExecutorService 而来的,有三个重载的方法,分别可以提交 Runnable 类型和 Callable 类型的任务。无论是哪个 submit 方法最终还是调用了 execute 方法来实现的。方法源码如下:

public Future> submit(Runnable task) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task, null);

execute(ftask);

return ftask;

}

public Future submit(Runnable task, T result) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task, result);

execute(ftask);

return ftask;

}

public Future submit(Callable task) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

首先对提交的任务进行判空指针后,三个方法都是调用 newTaskFor 方法把任务统一封装成 RunnableFuture 对象,然后把封装好的对象作为 execute 方法的入参去执行,而此时 execute 方法还未实现,这个方法是在 AbstractExecutorService 的继承类 ThreadPoolExecutor 中实现。下面看看 newTaskFor 方法是如何封装我们提交的任务的,两个重载方法的源码如下:

protected RunnableFuture newTaskFor(Runnable runnable, T value) {

return new FutureTask(runnable, value);

}

protected RunnableFuture newTaskFor(Callable callable) {

return new FutureTask(callable);

}

那么这个 FutureTask 是个什么东东呢,进入其源码发现它实现了 RunnableFuture 接口,而 RunnableFuture 接口的作用正如其名,它是 Runnable 和 Future 的结合体,表示一个能异步返回结果的线程。我们知道 Runnable 是不能返回结果的,所以上面第一个 newTaskFor(Runnable runnable, T value) 方法的第二个参数 value 的作用就是指定返回结果。其实最后也是通过 RunnableAdapter 把 Runnable 和 value 封装成 Callable 的。下面我们看看 execute 方法是怎么处理的,方法源码如下:

cdfbaa270bff608905ee1364d227d720.png

第 ① 步 获取当前的 ctl 值,在上篇 Java 线程池(一)中说过,变量 ctl 存储了线程池的工作状态 runState 和线程池中正在运行的线程数 workerCount。 第 ② 步 通过 workerCountOf 方法取出线程池中当前正在运行的线程数( ctl 低 29 位的值),如果线程池当前工作线程数小于核心线程数 corePoolSize,则进行第 ③ 步。 第 ③ 步 通过 addWorker 方法新建一个线程加到线程池中,addWorker 方法的第二个参数如果为 true 则限制添加线程的数量是根据 corePoolSize 来判断,反之则根据 maximumPoolSize 来判断,并把任务添加到该线程中。 第 ④ 步 如果添加失败,则重新获取 ctl 的值。 第 ⑤ 步 如果当前线程池的状态是运行状态(state < SHUTDOWN)并且把任务成功添加到队列中。 第 ⑥ 步 重新获取 ctl 的值,再次判断线程池的运行状态,如果不是运行状态,要从队列中移除任务,因为到这一步了,意味着之前已经把任务成功添加到队列中了,所以需要从队列移除。移除成功后调用拒绝策略对任务进行处理,整个 execute 方法结束(PS:为什么不在入队列之前就先判断线程池的状态呢?因为判断一个线程池工作处于运行状态到执行入队列操作这段时间,线程池可能已经被其它线程关闭了,所以提前判断其实毫无意义)。 第 ⑦ 步 通过 workerCountOf 方法取出线程池中当前正在运行的线程数( ctl 低 29 位的值),如果是 0 则执行 addWorker(null, false) 方法,第一个参数为 null 表示只是在线程池中创建一个线程出来,但是没有立即启动,因为我们创建线程池时可能要求核心线程数量为 0。第二个参数为 false 表示限制添加线程时根据 maximumPoolSize 来判断,如果当前线程池中正在运行线程数量大于 0 ,则直接返回,因为在上面第 ⑤ 步已经把任务成功添加到队列 workQueue 中,它会在将来的某个时刻执行到。 第 ⑧ 步 如果执行到这个地方,只有两种情况,一种是线程池的状态已经不是运行状态了,另一种是线程池是运行状态,但是此时线程池的工作线程数大于等于核心线程数(workerCount >= corePoolSize)并且队列 workQueue 已满。这时会再次调用 addWorker 方法,第二个参数为 false,意味着限制添加线程的数量是根据 maximumPoolSize 来判断的,如果失败则调用拒绝策略对任务进行处理,整个 execute 方法结束。上面的 execute 方法中多次调用 addWorker,该方法的主要作用就是创建一个线程来执行任务。addWorker 的方法签名如下:Java addWorker(Runnable firstTask, boolean core)

第一个参数 firstTask 如果不为 null,则创建的线程首先执行 firstTask 任务,然后才会从队列中获取任务,否则会直接从队列中获取任务。第二个参数如果为 true,则表示限制添加线程时根据 corePoolSize 来判断,否则根据maximumPoolSize 来判断。我们看看 addWorker 方法的源码,方法源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

方法首先获取线程池 ctl 属性的值,该属性包含了线程池的运行状态和工作线程数,通过 runStateOf 获取线程池的运行状态,然后执行下面这个比较复杂的条件判断

58af66fcf5a23501902e98492dfee9d6.png

第 ① 个条件表示此时线程池已经不再接受新任务了,接下来的 ②、③、④ 三个判断条件只要有一个不满足,那么方法就会返回 false,方法结束。第 ② 个条件表示线程池为关闭状态,处于关闭状态的线程池不会处理新提交的任务,但会处理完已处理的任务,第 ③ 个条件为 firstTask 为 null,第 ④ 个条件为队列不为空。我们看看如果线程池此时为关闭状态的情况,这种情况线程池不会接受新提交的任务,所以此时如果传入的 firstTask 不为 null,则会直接返回 false;然后如果 firstTask 为 null,并且队列 workQueue 为空,此时也会返回 false,因为此时队列里已经没有任务了,那么也不需要再添加线程了,然后接下来会进入一个循环。

7a9c3c37091aef2b3cb0c5cb3ebb0e99.png

第 ① 步 调用 workerCountOf 方法获取当前线程池的工作线程数 第 ② 步 如果当前线程池的工作数大于 CAPACITY 也就是 ctl 的低 29 位的最大值,则返回 false,如果不大于 CAPACITY,然后根据 core (该方法的第二个参数)来判断是和 corePoolSize 比较还是和 maximumPoolSize 比较,如果比这个值大则返回 false。 第 ③ 步 使用 ctl 的 compareAndSet 原子方法尝试把工作线程数 workerCount + 1,如果增加成功,退出第一层循环。 第 ④ 步 如果增加线程池工作线程数失败,则重新获取 ctl 的值。 第 ⑤ 步 调用 runStateOf 获取线程池的状态,如果不等于方法前面获取的 rs,说明线程池的状态已经改变了,回到第一层循环继续执行。接下来会启动线程执行任务,源码如下:

4325bca8815d9f0adf8b0a29e1eb13b9.png

第 ① 步 根据 firstTask 创建 Worker 对象,每一个 Worker 对象都会创建一个线程,然后会使用重入锁 ReentrantLock 进行加锁操作。 第 ② 步 调用 runStateOf 获取线程池的状态,然后进行一个条件判断,第一个 rs < SHUTDOWN 表示线程池是运行状态。如果线程池是运行状态或者线程池是关闭状态并且 firstTask 为 null,那么就往线程池中加入线程(因为当线程池是 SHUTDOWN 状态时不会再向线程池添加新的任务,但会执行队列 workQueue 中的任务)。这里的 workers 是一个 HashSet,所以其 add 方法不是线程安全的,所以需要加锁操作。然后修改线程池中出现过的最大线程数量 largestPoolSize 记录和把是否添加成功标记 workerAdded 为 true 。如果 workerAdded 为 true 那么会启动线程并把线程是否启动标记 workerStarted 改为 true。 第 ③ 步 根据线程是否启动 workerStarted 标记来判断是否需要进行失败的操作。包含从 workers 移除当前的 worker、线程池的工作线程数减 1、尝试终端线程池。

线程池中线程是如何执行的

线程池的线程执行是调用 Worker 的 thread 属性的 start 方法,而 thread 的 run 方法实际上调用了 Worker 类的 runWorker 方法,所以我们直接来看看 runWorker 方法的源码:

5709d5bbcdae56497f215c23c7bfe51b.png

第 ① 步 获取第一个任务,while 循环不断地通过 getTask 方法从队列中获取任务。 第 ② 步 这个判断条件目的是要保证如果线程池正在停止,要保证当前线程是中断状态,如果是的话,要保证当前线程不是终端状态。 第 ③ 步 方法 beforeExecute 方法在类 ThreadPoolExecutor 中没有做任何操作,是留给子类去自定义在线程执行之前添加操作的方法。 第 ④ 步 执行 task.run() 执行任务(PS:这里为什么是调用 run 方法而不是调用 start 方法呢?我们知道当调用了 start 方法后操作系统才会给我们创建一个独立的线程来运行,而调用 run 方法只是一个普通的方法调用,而线程池正好就是需要它是一个普通的方法才能进行任务的调度。我们可以想象一下,假如这里是调用的 Runnable 的 start 方法,那么会是什么结果呢。如果我们往一个核心线程数、最大线程数为 3 的线程池里丢了 500 个任务,那么它会额外的创建 500 个线程,同时每个任务都是异步执行的,结果一下子就执行完毕了,根本无法对任务进行调度。从而没法做到由这 3 个 Worker 线程来调度这 1000 个任务,而只有当做一个普通的 run 方法调用时才能满足线程池的这个要求)。 第 ⑤ 步 方法 afterExecute 方法在类 ThreadPoolExecutor 中没有做任何操作,是留给子类去自定义在线程执行之后添加操作的方法。completedAbruptly 变量是用来表示在执行任务过程中是否出现了异常,processWorkerExit 方法中会对该变量的值进行判断。接下来我们看看 getTask 方法是如何从队列中获取任务的,方法源码如下:

ce228718ed2bd4b83ca6d0b72f772b86.png

第 ① 步 如果线程池不是运行状态,则判断线程池是否正在停止或者当前队列为空,如果条件满足将线程池的工作线程数减一并返回 null。因为如果当前线程池状态的值是 SHUTDOWN 或以上时,就不允许再向队列中添加任务了。 第 ② 步 这里的 timed 变量用来标记是否需要线程进行超时控制,allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时。wc > corePoolSize 表示当前线程池中的工作线程数量大于核心线程数量,对于超过核心线程数量的这些线程,需要进行超时控制。 第 ③ 步 第一个判断 wc > maximumPoolSize 如果成立是因为可能在此方法执行阶段同时执行了线程池的 setMaximumPoolSize 方法;第二个判断 timed && timedOut 如果成立表示当前操作需要进行超时控制,并且上次从队列中获取任务发生了超时(timeOut 变量的值表示上次从阻塞队列中取任务时是否超时);第三个判断 wc > 1 || workQueue.isEmpty() 如果线程池中工作线程数量大于 1,或者队列是空的,那么尝试将 workerCount 减一,如果减一失败,则返回重试。如果 wc == 1 时,也就说明当前线程是线程池中唯一的一个线程了。 第 ④ 步 根据 timed 来判断,如果为 true,则通过阻塞队列的 poll 方法进行超时控制,如果在 keepAliveTime 时间内没有获取到任务,则返回 null,否则通过 take 方法,如果这时队列为空,则 take 方法会阻塞直到队列不为空。如果 r == null,说明已经超时,timedOut 设置为 true。 第 ⑤ 步 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并重新循环重试。

关闭线程池

线程池的关闭一般都是使用 shutdown 方法和 shutdownNow 方法,两者的区别是前面的 shutdown 方法不会执行新的任务,但是会执行完当前正在执行的任务,而后面的 shutdownNow 方法会立即停止当前线程池,不管当前是否有线程在执行。一般都是使用 shutdown 方法来停止线程池,其方法源码如下:

public void shutdown() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

checkShutdownAccess();

advanceRunState(SHUTDOWN);

interruptIdleWorkers();

onShutdown(); // hook for ScheduledThreadPoolExecutor

} finally {

mainLock.unlock();

}

tryTerminate();

}

advanceRunState(SHUTDOWN) 方法的作用是通过 CAS 原子操作将线程池的状态更改为关闭状态。interruptIdleWorkers 方法是对空闲的线程进行中断,其实是调用重载带参数的函数 interruptIdleWorkers(false)。然后 onShutdown 方法和上文提到的 beforeExecute、afterExecute 方法一样,在类 ThreadPoolExecutor 是空实现,也是个钩子函数。我们看看 interruptIdleWorkers 的实现源码:

private void interruptIdleWorkers(boolean onlyOne) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers) {

Thread t = w.thread;

if (!t.isInterrupted() && w.tryLock()) {

try {

t.interrupt();

} catch (SecurityException ignore) {

} finally {

w.unlock();

}

}

if (onlyOne)

break;

}

} finally {

mainLock.unlock();

}

}

先进行加锁操作,然后遍历 workers 容器,也就是遍历线程池中的线程,对每个线程进行 tryLock 操作,如果成功说明线程空闲,则设置其中断标志位。而线程是否响应中断则交给我们定义任务的人来决定。

总结

本文比较详细的分析了线程池任务的提交、线程的执行、线程池的关闭的工作流程。通过学习线程池相关的源码后,看到了在其内部用运用了很多多线程的解决方法,有如下几个方式:

1.通过定义重入锁 ReentrantLock 变量 mainLock 来解决并发多线程的安全问题

2.利用等待机制来实现线程之间的通讯问题

除了内置的功能外,ThreadPoolExecutor 也向外提供了两个接口供我们自己扩展满足我们需求的线程池,这两个接口分别是:beforeExecute 任务执行前执行的方法,afterExecute 任务执行结束后执行的方法。

这篇关于java 最终幻想2_Java 线程池(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/273478

相关文章

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.