深读源码-java线程系列之线程池深入解析——普通任务执行流程

本文主要是介绍深读源码-java线程系列之线程池深入解析——普通任务执行流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

前面我们一起学习了Java中线程池的体系结构、构造方法和生命周期,本章我们一起来学习线程池中普通任务到底是怎么执行的。

建议学习本章前先去复习之前《深读源码-java线程系列之自己手写一个线程池》这两章内容,有助于理解本章的内容,且那边的代码比较短小,学起来相对容易一些。

问题

(1)线程池中的普通任务是怎么执行的?

(2)任务又是在哪里被执行的?

(3)线程池中有哪些主要的方法?

(4)如何使用Debug模式一步一步调试线程池?

使用案例

我们创建一个线程池,它的核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话。

如果使用它运行20个任务,会是什么结果呢?

public class ThreadPoolTest01 {public static void main(String[] args) {// 新建一个线程池// 核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话ExecutorService threadPool = new ThreadPoolExecutor(5, 10,1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(), new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println(currentThreadName() + ", discard task");}});// 提交20个任务,注意观察numfor (int i = 0; i < 20; i++) {int num = i;threadPool.execute(()->{try {System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis());Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}});}}private static String currentThreadName() {return Thread.currentThread().getName();}
}

构造方法的7个参数我们就不详细解释了,有兴趣的可以看看《深读源码-java线程系列之线程池深入解析——构造方法》这章。

我们一起来看看一次运行的结果:

pool-1-thread-1, 0 running, 1572678434411
pool-1-thread-3, 2 running, 1572678434411
pool-1-thread-2, 1 running, 1572678434411
pool-1-thread-4, 3 running, 1572678434411
pool-1-thread-5, 4 running, 1572678434411
pool-1-thread-6, 10 running, 1572678434412
pool-1-thread-7, 11 running, 1572678434412
pool-1-thread-8, 12 running, 1572678434412
main, discard task
main, discard task
main, discard task
main, discard task
main, discard taskpool-1-thread-9, 13 running, 1572678434412
pool-1-thread-10, 14 running, 1572678434412
pool-1-thread-3, 5 running, 1572678436411
pool-1-thread-1, 6 running, 1572678436411
pool-1-thread-6, 7 running, 1572678436412
pool-1-thread-2, 8 running, 1572678436412
pool-1-thread-7, 9 running, 1572678436412

注意,观察num值的打印信息,先是打印了0~4,再打印了10~14,最后打印了5~9,竟然不是按顺序打印的,为什么呢?

让我们一步一步debug进去查看。

execute()方法

execute()方法是线程池提交任务的方法之一,也是最核心的方法。

// 提交任务,任务并非立即执行,所以翻译成执行任务似乎不太合适
public void execute(Runnable command) {// 任务不能为空if (command == null)throw new NullPointerException();// 控制变量(高3位存储状态,低29位存储工作线程的数量)int c = ctl.get();// 1. 如果工作线程数量小于核心数量if (workerCountOf(c) < corePoolSize) {// 就添加一个工作线程(核心)if (addWorker(command, true))return;// 重新获取下控制变量c = ctl.get();}// 2. 如果达到了核心数量且线程池是运行状态,任务入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);// 容错检查工作线程数量是否为0,如果为0就创建一个else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3. 任务入队列失败,尝试创建非核心工作线程else if (!addWorker(command, false))// 非核心工作线程创建失败,执行拒绝策略reject(command);
}

关于线程池状态的内容,我们这里不拿出来细讲了,有兴趣的可以看看《深读源码-java线程系列之线程池深入解析——生命周期》这章。

提交任务的过程大致如下:

(1)工作线程数量小于核心数量,创建核心线程;

(2)达到核心数量,进入任务队列;

(3)任务队列满了,创建非核心线程;

(4)达到最大数量,执行拒绝策略;

其实,就是三道坎——核心数量、任务队列、最大数量,这样就比较好记了。

流程图大致如下:

threadpool_task

任务流转的过程我们知道了,但是任务是在哪里执行的呢?继续往下看。

addWorker()方法

这个方法主要用来创建一个工作线程,并启动之,其中会做线程池状态、工作线程数量等各种检测。

private boolean addWorker(Runnable firstTask, boolean core) {// 判断有没有资格创建新的工作线程// 主要是一些状态/数量的检查等等// 这段代码比较复杂,可以先跳过retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池状态检查if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 工作线程数量检查for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 数量加1并跳出循环if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建工作线程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 再次检查线程池的状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 添加到工作线程队列workers.add(w);// 还在池子中的线程数量(只能在mainLock中使用)int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;// 标记线程添加成功workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 线程添加成功之后启动线程t.start();workerStarted = true;}}} finally {// 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等)if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

这里其实还没到任务执行的地方,上面我们可以看到线程是包含在Worker这个类中的,那么,我们就跟踪到这个类中看看。

Worker内部类

Worker内部类可以看作是对工作线程的包装,一般地,我们说工作线程就是指Worker,但实际上是指其维护的Thread实例。

// Worker继承自AQS,自带锁的属性
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{// 真正工作的线程final Thread thread;// 第一个任务,从构造方法传进来Runnable firstTask;// 完成任务数volatile long completedTasks;// 构造方法Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 使用线程工厂生成一个线程// 注意,这里把Worker本身作为Runnable传给线程this.thread = getThreadFactory().newThread(this);}// 实现Runnable的run()方法public void run() {// 调用ThreadPoolExecutor的runWorker()方法runWorker(this);}// 省略锁的部分
}

这里要能够看出来工作线程Thread启动的时候实际是调用的Worker的run()方法,进而调用的是ThreadPoolExecutor的runWorker()方法。

runWorker()方法

runWorker()方法是真正执行任务的地方。

final void runWorker(Worker w) {// 工作线程Thread wt = Thread.currentThread();// 任务Runnable task = w.firstTask;w.firstTask = null;// 强制释放锁(shutdown()里面有加锁)// 这里相当于无视那边的中断标记w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 取任务,如果有第一个任务,这里先执行第一个任务// 只要能取到任务,这就是个死循环// 正常来说getTask()返回的任务是不可能为空的,因为前面execute()方法是有空判断的// 那么,getTask()什么时候才会返回空任务呢?while (task != null || (task = getTask()) != null) {w.lock();// 检查线程池的状态if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 钩子方法,方便子类在任务执行前做一些处理beforeExecute(wt, task);Throwable thrown = null;try {// 真正任务执行的地方task.run();// 异常处理} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 钩子方法,方便子类在任务执行后做一些处理afterExecute(task, thrown);}} finally {// task置为空,重新从队列中取task = null;// 完成任务数加1w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 到这里肯定是上面的while循环退出了processWorkerExit(w, completedAbruptly);}
}

这个方法比较简单,忽略状态检测和锁的内容,如果有第一个任务,就先执行之,之后再从任务队列中取任务来执行,获取任务是通过getTask()来进行的。

getTask()

从队列中获取任务的方法,里面包含了对线程池状态、空闲时间等的控制。

private Runnable getTask() {// 是否超时boolean timedOut = false;// 死循环for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池状态是SHUTDOWN的时候会把队列中的任务执行完直到队列为空// 线程池状态是STOP时立即退出if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}// 工作线程数量int wc = workerCountOf(c);// 是否允许超时,有两种情况:// 1. 是允许核心线程数超时,这种就是说所有的线程都可能超时// 2. 是工作线程数大于了核心数量,这种肯定是允许超时的// 注意,非核心线程是一定允许超时的,这里的超时其实是指取任务超时boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 超时判断(还包含一些容错判断)if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 超时了,减少工作线程数量,并返回nullif (compareAndDecrementWorkerCount(c))return null;// 减少工作线程数量失败,则重试continue;}try {// 真正取任务的地方// 默认情况下,只有当工作线程数量大于核心线程数量时,才会调用poll()方法触发超时调用Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 取到任务了就正常返回if (r != null)return r;// 没取到任务表明超时了,回到continue那个if中返回nulltimedOut = true;} catch (InterruptedException retry) {// 捕获到了中断异常// 中断标记是在调用shutDown()或者shutDownNow()的时候设置进去的// 此时,会回到for循环的第一个if处判断状态是否要返回nulltimedOut = false;}}
}

注意,这里取任务会根据工作线程的数量判断是使用BlockingQueue的poll(timeout, unit)方法还是take()方法。

poll(timeout, unit)方法会在超时时返回null,如果timeout<=0,队列为空时直接返回null。

take()方法会一直阻塞直到取到任务或抛出中断异常。

所以,如果keepAliveTime设置为0,当任务队列为空时,非核心线程取不出来任务,会立即结束其生命周期。

默认情况下,是不允许核心线程超时的,但是可以通过下面这个方法设置使核心线程也可超时。

public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0)throw new IllegalArgumentException("Core threads must have nonzero keep alive times");if (value != allowCoreThreadTimeOut) {allowCoreThreadTimeOut = value;if (value)interruptIdleWorkers();}
}

至此,线程池中任务的执行流程就结束了。

注:

1、当线程池中的线程数大于 corePoolSize 时,keepAliveTime 为多余的空闲线程等待新任务的 最长时间,超过这个时间后多余的线程将被终止。这里把 keepAliveTime 设置为 0L,意味着多余 的空闲线程会被立即终止。

2、如果要设置核心线程的回收,则需要设置 executor.allowCoreThreadTimeOut(true),但这时 keepAliveTime 必须要 >0 才行,否则会抛出异常(throw new IllegalArgumentException("Core threads must have nonzero keep alive times");)

再看开篇问题

观察num值的打印信息,先是打印了0~4,再打印了10~14,最后打印了5~9,竟然不是按顺序打印的,为什么呢?

线程池的参数:核心数量5个,最大数量10个,任务队列5个。

答:执行前5个任务执行时,正好还不到核心数量,所以新建核心线程并执行了他们;

执行中间的5个任务时,已达到核心数量,所以他们先入队列;

执行后面5个任务时,已达核心数量且队列已满,所以新建非核心线程并执行了他们;

再执行最后5个任务时,线程池已达到满负荷状态,所以执行了拒绝策略。

总结

本章通过一个例子并结合线程池的重要方法我们一起分析了线程池中普通任务执行的流程。

(1)execute(),提交任务的方法,根据核心数量、任务队列大小、最大数量,分成四种情况判断任务应该往哪去;

(2)addWorker(),添加工作线程的方法,通过Worker内部类封装一个Thread实例维护工作线程的执行;

(3)runWorker(),真正执行任务的地方,先执行第一个任务,再源源不断从任务队列中取任务来执行;

(4)getTask(),真正从队列取任务的地方,默认情况下,根据工作线程数量与核心数量的关系判断使用队列的poll()还是take()方法,keepAliveTime参数也是在这里使用的。

彩蛋

核心线程和非核心线程有什么区别?

答:实际上并没有什么区别,主要是根据corePoolSize来判断任务该去哪里,两者在执行任务的过程中并没有任何区别。有可能新建的时候是核心线程,而keepAliveTime时间到了结束了的也可能是刚开始创建的核心线程。

Worker继承自AQS有何意义?

前面我们看了Worker内部类的定义,它继承自AQS,天生自带锁的特性,那么,它的锁是用来干什么的呢?跟任务的执行有关系吗?

答:既然是跟锁(同步)有关,说明Worker类跨线程使用了,此时我们查看它的lock()方法发现只在runWorker()方法中使用了,但是其tryLock()却是在interruptIdleWorkers()方法中使用的。

private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}

interruptIdleWorkers()方法的意思是中断空闲线程的意思,它只会中断BlockingQueue的poll()或take()方法,而不会中断正在执行的任务。

一般来说,interruptIdleWorkers()方法的调用不是在本工作线程,而是在主线程中调用的,还记得《深读源码-java线程系列之线程池深入解析——生命周期》中说过的shutdown()和shutdownNow()方法吗?

观察两个方法中中断线程的方法,shutdown()中就是调用了interruptIdleWorkers()方法,这里tryLock()获取到锁了再中断,如果没有获取到锁则不中断,没获取到锁只有一种情况,也就是lock()所在的地方,也就是有任务正在执行。

而shutdownNow()中中断线程则很暴力,并没有tryLock(),而是直接中断了线程,所以调用shutdownNow()可能会中断正在执行的任务。

所以,Worker继承自AQS实际是要使用其锁的能力,这个锁主要是用来控制shutdown()时不要中断正在执行任务的线程。


原文链接:https://www.cnblogs.com/tong-yuan/p/11787395.html

这篇关于深读源码-java线程系列之线程池深入解析——普通任务执行流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/362644

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06