线程池原理--执行器ThreadPoolExecutor

2024-06-15 00:58

本文主要是介绍线程池原理--执行器ThreadPoolExecutor,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 线程池原理--执行器ThreadPoolExecutor
    • 属性
    • 构造器
      • 构造器参数介绍
    • execute()方法

线程池原理–总索引

线程池原理–执行器ThreadPoolExecutor

ThreadPoolExecutor 是Executor的核心实现类。

属性

  • 线程池运行状态
    • RUNNING:接受新的任务,并且处理队列中的任务
    • SHUTDOWN: 不接受新的任务,但是仍然处理队列中的任务
    • STOP: 不接受新的任务,也不处理队列中的任务
    • TIDYING: 所有的任务已经结束, workerCount 为0,程序会调用钩子方法
      terminated(),这个什么也没做。
    • TERMINATED: 所有的任务都已经完成。
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

在这里插入图片描述

  • AtomicInteger ctl
    原子整形包装的线程池控制状态(The main pool control state)。
    在这里插入图片描述
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//rs :高3位,runState , wc : 低29位,workerCount,线程池中当前活动的线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • CAPACITY 线程池的最大容量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  • 任务队列,用于存放执行未结束的任务。详细看线程池原理–任务队列BlockingQueue
private final BlockingQueue<Runnable> workQueue;
  • ReentrantLock 用于对于工作集和related bookkeeping(不知道啥意思)的并发访问锁控制。
private final ReentrantLock mainLock = new ReentrantLock();
  • 工作线程集,Worker是ThreadPoolExecutor类的内部类,并实现了Runnable接口,用户提交的任务都是给Worker线程执行。
private final HashSet<Worker> workers = new HashSet<>();
  • Tracks largest attained pool size. Accessed only under mainLock.
private int largestPoolSize;
private final Condition termination = mainLock.newCondition();
  • 完成任务的计数器,当任务结束时更新。
private long completedTaskCount;
  • 创建新线程的工厂
private volatile ThreadFactory threadFactory;
  • 拒绝策略,详情察看
    线程池原理–拒绝策略之RejectedExecutionHandler类
private volatile RejectedExecutionHandler handler;
  • 默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  • 当 allowCoreThreadTimeOut设置为true.
    如果当前线程池中的线程如果大于corePoolSize,那么如果空闲时间超过keepAliveTime,那么就会销毁掉一些线程。
    否则,就一直等到有新的任务执行。
 private volatile long keepAliveTime;
  • true:核心线程即使空闲也不会被销毁掉。
  • false:核心线程空闲超过keepAliveTime定义的超时时间,则会被销毁掉。
private volatile boolean allowCoreThreadTimeOut;
  • 线程池核心线程大小
private volatile int corePoolSize;
  • 最大的线程池大小 = 核心线程大小 + 非核心线程大小
private volatile int maximumPoolSize;
  • 运行时权限

 private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");
  • 调用finalize会用到
 private final AccessControlContext acc;this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();

构造器

 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

构造器参数介绍

下面来解释下各个参数:

  • int corePoolSize:该线程池中核心线程数最大值
    核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程,核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。
    如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉。

  • int maximumPoolSize: 该线程池中线程总数最大值
    线程总数 = 核心线程数 + 非核心线程数。

  • long keepAliveTime
    当 allowCoreThreadTimeOut设置为true.
    如果当前线程池中的线程如果大于corePoolSize,那么如果空闲时间超过keepAliveTime,那么就会销毁掉一些线程。
    否则,就一直等到有新的任务执行。

  • TimeUnit unit:keepAliveTime的单位
    TimeUnit是一个枚举类型,其包括:
    NANOSECONDS : 1微毫秒 = 1微秒 / 1000
    MICROSECONDS : 1微秒 = 1毫秒 / 1000
    MILLISECONDS : 1毫秒 = 1秒 /1000
    SECONDS : 秒
    MINUTES : 分
    HOURS : 小时
    DAYS : 天

  • BlockingQueue ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。

    • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
    • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
    • DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue: 一个不存储元素的阻塞队列。
    • LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
  • ThreadFactory threadFactory:线程工厂,用于创建线程执行 我们提交的任务。

  • RejectedExecutionHandler handler:这个指定当队列满时继续添加任务该u做如何处理,详细可以看线程池原理–拒绝策略之RejectedExecutionHandler类。

execute()方法

execute()方法用于提交任务。

//提交的Runnable接口的实现类
//实际用户可以提交Runnable接口的实现类或者Callable接口的实现类,AbstractExecutorService会在submit()方法中进行预处理,将Callable类型对象转化为Runnable类型对象
public void execute(Runnable command) {if (command == null)throw new NullPointerException();         *//** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

在这里插入图片描述

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁)
  2. 如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)
  4. 如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
    在这里插入图片描述
  • addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

参数:
firstTask: worker线程的初始任务,可以为空
core: true:将corePoolSize作为上限,false:将maximumPoolSize作为上限
addWorker方法有4种传参的方式:

1、addWorker(command, true)2、addWorker(command, false)3、addWorker(null, false)4、addWorker(null, true)

在execute方法中就使用了前3种,结合这个核心方法进行以下分析
第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
第四个:这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行

这篇关于线程池原理--执行器ThreadPoolExecutor的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1061982

相关文章

Java多线程父线程向子线程传值问题及解决

《Java多线程父线程向子线程传值问题及解决》文章总结了5种解决父子之间数据传递困扰的解决方案,包括ThreadLocal+TaskDecorator、UserUtils、CustomTaskDeco... 目录1 背景2 ThreadLocal+TaskDecorator3 RequestContextH

java父子线程之间实现共享传递数据

《java父子线程之间实现共享传递数据》本文介绍了Java中父子线程间共享传递数据的几种方法,包括ThreadLocal变量、并发集合和内存队列或消息队列,并提醒注意并发安全问题... 目录通过 ThreadLocal 变量共享数据通过并发集合共享数据通过内存队列或消息队列共享数据注意并发安全问题总结在 J

MySQL中的MVCC底层原理解读

《MySQL中的MVCC底层原理解读》本文详细介绍了MySQL中的多版本并发控制(MVCC)机制,包括版本链、ReadView以及在不同事务隔离级别下MVCC的工作原理,通过一个具体的示例演示了在可重... 目录简介ReadView版本链演示过程总结简介MVCC(Multi-Version Concurr

异步线程traceId如何实现传递

《异步线程traceId如何实现传递》文章介绍了如何在异步请求中传递traceId,通过重写ThreadPoolTaskExecutor的方法和实现TaskDecorator接口来增强线程池,确保异步... 目录前言重写ThreadPoolTaskExecutor中方法线程池增强总结前言在日常问题排查中,

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

SpringCloud配置动态更新原理解析

《SpringCloud配置动态更新原理解析》在微服务架构的浩瀚星海中,服务配置的动态更新如同魔法一般,能够让应用在不重启的情况下,实时响应配置的变更,SpringCloud作为微服务架构中的佼佼者,... 目录一、SpringBoot、Cloud配置的读取二、SpringCloud配置动态刷新三、更新@R

Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单

《Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单》:本文主要介绍Springboot的ThreadPoolTaskScheduler线... 目录ThreadPoolTaskScheduler线程池实现15分钟不操作自动取消订单概要1,创建订单后

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

C语言线程池的常见实现方式详解

《C语言线程池的常见实现方式详解》本文介绍了如何使用C语言实现一个基本的线程池,线程池的实现包括工作线程、任务队列、任务调度、线程池的初始化、任务添加、销毁等步骤,感兴趣的朋友跟随小编一起看看吧... 目录1. 线程池的基本结构2. 线程池的实现步骤3. 线程池的核心数据结构4. 线程池的详细实现4.1 初