线程池原理--执行器ThreadPoolExecutor

2024-06-15 00:58

本文主要是介绍线程池原理--执行器ThreadPoolExecutor,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 线程池原理--执行器ThreadPoolExecutor
    • 属性
    • 构造器
      • 构造器参数介绍
    • execute()方法

线程池原理–总索引

线程池原理–执行器ThreadPoolExecutor

ThreadPoolExecutor 是Executor的核心实现类。

属性

  • 线程池运行状态
    • RUNNING:接受新的任务,并且处理队列中的任务
    • SHUTDOWN: 不接受新的任务,但是仍然处理队列中的任务
    • STOP: 不接受新的任务,也不处理队列中的任务
    • TIDYING: 所有的任务已经结束, workerCount 为0,程序会调用钩子方法
      terminated(),这个什么也没做。
    • TERMINATED: 所有的任务都已经完成。
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

在这里插入图片描述

  • AtomicInteger ctl
    原子整形包装的线程池控制状态(The main pool control state)。
    在这里插入图片描述
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//rs :高3位,runState , wc : 低29位,workerCount,线程池中当前活动的线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • CAPACITY 线程池的最大容量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  • 任务队列,用于存放执行未结束的任务。详细看线程池原理–任务队列BlockingQueue
private final BlockingQueue<Runnable> workQueue;
  • ReentrantLock 用于对于工作集和related bookkeeping(不知道啥意思)的并发访问锁控制。
private final ReentrantLock mainLock = new ReentrantLock();
  • 工作线程集,Worker是ThreadPoolExecutor类的内部类,并实现了Runnable接口,用户提交的任务都是给Worker线程执行。
private final HashSet<Worker> workers = new HashSet<>();
  • Tracks largest attained pool size. Accessed only under mainLock.
private int largestPoolSize;
private final Condition termination = mainLock.newCondition();
  • 完成任务的计数器,当任务结束时更新。
private long completedTaskCount;
  • 创建新线程的工厂
private volatile ThreadFactory threadFactory;
  • 拒绝策略,详情察看
    线程池原理–拒绝策略之RejectedExecutionHandler类
private volatile RejectedExecutionHandler handler;
  • 默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  • 当 allowCoreThreadTimeOut设置为true.
    如果当前线程池中的线程如果大于corePoolSize,那么如果空闲时间超过keepAliveTime,那么就会销毁掉一些线程。
    否则,就一直等到有新的任务执行。
 private volatile long keepAliveTime;
  • true:核心线程即使空闲也不会被销毁掉。
  • false:核心线程空闲超过keepAliveTime定义的超时时间,则会被销毁掉。
private volatile boolean allowCoreThreadTimeOut;
  • 线程池核心线程大小
private volatile int corePoolSize;
  • 最大的线程池大小 = 核心线程大小 + 非核心线程大小
private volatile int maximumPoolSize;
  • 运行时权限

 private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");
  • 调用finalize会用到
 private final AccessControlContext acc;this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();

构造器

 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

构造器参数介绍

下面来解释下各个参数:

  • int corePoolSize:该线程池中核心线程数最大值
    核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程,核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。
    如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉。

  • int maximumPoolSize: 该线程池中线程总数最大值
    线程总数 = 核心线程数 + 非核心线程数。

  • long keepAliveTime
    当 allowCoreThreadTimeOut设置为true.
    如果当前线程池中的线程如果大于corePoolSize,那么如果空闲时间超过keepAliveTime,那么就会销毁掉一些线程。
    否则,就一直等到有新的任务执行。

  • TimeUnit unit:keepAliveTime的单位
    TimeUnit是一个枚举类型,其包括:
    NANOSECONDS : 1微毫秒 = 1微秒 / 1000
    MICROSECONDS : 1微秒 = 1毫秒 / 1000
    MILLISECONDS : 1毫秒 = 1秒 /1000
    SECONDS : 秒
    MINUTES : 分
    HOURS : 小时
    DAYS : 天

  • BlockingQueue ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。

    • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
    • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
    • DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue: 一个不存储元素的阻塞队列。
    • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
  • ThreadFactory threadFactory:线程工厂,用于创建线程执行 我们提交的任务。

  • RejectedExecutionHandler handler:这个指定当队列满时继续添加任务该u做如何处理,详细可以看线程池原理–拒绝策略之RejectedExecutionHandler类。

execute()方法

execute()方法用于提交任务。

//提交的Runnable接口的实现类
//实际用户可以提交Runnable接口的实现类或者Callable接口的实现类,AbstractExecutorService会在submit()方法中进行预处理,将Callable类型对象转化为Runnable类型对象
public void execute(Runnable command) {if (command == null)throw new NullPointerException();         *//** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

在这里插入图片描述

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁)
  2. 如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)
  4. 如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
    在这里插入图片描述
  • addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

参数:
firstTask: worker线程的初始任务,可以为空
core: true:将corePoolSize作为上限,false:将maximumPoolSize作为上限
addWorker方法有4种传参的方式:

1、addWorker(command, true)2、addWorker(command, false)3、addWorker(null, false)4、addWorker(null, true)

在execute方法中就使用了前3种,结合这个核心方法进行以下分析
第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
第四个:这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行

这篇关于线程池原理--执行器ThreadPoolExecutor的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1061982

相关文章

Java编译生成多个.class文件的原理和作用

《Java编译生成多个.class文件的原理和作用》作为一名经验丰富的开发者,在Java项目中执行编译后,可能会发现一个.java源文件有时会产生多个.class文件,从技术实现层面详细剖析这一现象... 目录一、内部类机制与.class文件生成成员内部类(常规内部类)局部内部类(方法内部类)匿名内部类二、

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

Java的IO模型、Netty原理解析

《Java的IO模型、Netty原理解析》Java的I/O是以流的方式进行数据输入输出的,Java的类库涉及很多领域的IO内容:标准的输入输出,文件的操作、网络上的数据传输流、字符串流、对象流等,这篇... 目录1.什么是IO2.同步与异步、阻塞与非阻塞3.三种IO模型BIO(blocking I/O)NI

Spring Boot3虚拟线程的使用步骤详解

《SpringBoot3虚拟线程的使用步骤详解》虚拟线程是Java19中引入的一个新特性,旨在通过简化线程管理来提升应用程序的并发性能,:本文主要介绍SpringBoot3虚拟线程的使用步骤,... 目录问题根源分析解决方案验证验证实验实验1:未启用keep-alive实验2:启用keep-alive扩展建

Java终止正在运行的线程的三种方法

《Java终止正在运行的线程的三种方法》停止一个线程意味着在任务处理完任务之前停掉正在做的操作,也就是放弃当前的操作,停止一个线程可以用Thread.stop()方法,但最好不要用它,本文给大家介绍了... 目录前言1. 停止不了的线程2. 判断线程是否停止状态3. 能停止的线程–异常法4. 在沉睡中停止5

JAVA封装多线程实现的方式及原理

《JAVA封装多线程实现的方式及原理》:本文主要介绍Java中封装多线程的原理和常见方式,通过封装可以简化多线程的使用,提高安全性,并增强代码的可维护性和可扩展性,需要的朋友可以参考下... 目录前言一、封装的目标二、常见的封装方式及原理总结前言在 Java 中,封装多线程的原理主要围绕着将多线程相关的操

kotlin中的模块化结构组件及工作原理

《kotlin中的模块化结构组件及工作原理》本文介绍了Kotlin中模块化结构组件,包括ViewModel、LiveData、Room和Navigation的工作原理和基础使用,本文通过实例代码给大家... 目录ViewModel 工作原理LiveData 工作原理Room 工作原理Navigation 工

Java的volatile和sychronized底层实现原理解析

《Java的volatile和sychronized底层实现原理解析》文章详细介绍了Java中的synchronized和volatile关键字的底层实现原理,包括字节码层面、JVM层面的实现细节,以... 目录1. 概览2. Synchronized2.1 字节码层面2.2 JVM层面2.2.1 ente

Java捕获ThreadPoolExecutor内部线程异常的四种方法

《Java捕获ThreadPoolExecutor内部线程异常的四种方法》这篇文章主要为大家详细介绍了Java捕获ThreadPoolExecutor内部线程异常的四种方法,文中的示例代码讲解详细,感... 目录方案 1方案 2方案 3方案 4结论方案 1使用 execute + try-catch 记录

MySQL的隐式锁(Implicit Lock)原理实现

《MySQL的隐式锁(ImplicitLock)原理实现》MySQL的InnoDB存储引擎中隐式锁是一种自动管理的锁,用于保证事务在行级别操作时的数据一致性和安全性,本文主要介绍了MySQL的隐式锁... 目录1. 背景:什么是隐式锁?2. 隐式锁的工作原理3. 隐式锁的类型4. 隐式锁的实现与源代码分析4