本文主要是介绍线程池原理--执行器ThreadPoolExecutor,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 线程池原理--执行器ThreadPoolExecutor
- 属性
- 构造器
- 构造器参数介绍
- execute()方法
线程池原理–总索引
线程池原理–执行器ThreadPoolExecutor
ThreadPoolExecutor 是Executor的核心实现类。
属性
- 线程池运行状态
- RUNNING:接受新的任务,并且处理队列中的任务
- SHUTDOWN: 不接受新的任务,但是仍然处理队列中的任务
- STOP: 不接受新的任务,也不处理队列中的任务
- TIDYING: 所有的任务已经结束, workerCount 为0,程序会调用钩子方法
terminated(),这个什么也没做。 - TERMINATED: 所有的任务都已经完成。
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- AtomicInteger ctl
原子整形包装的线程池控制状态(The main pool control state)。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//rs :高3位,runState , wc : 低29位,workerCount,线程池中当前活动的线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
- CAPACITY 线程池的最大容量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- 任务队列,用于存放执行未结束的任务。详细看线程池原理–任务队列BlockingQueue
private final BlockingQueue<Runnable> workQueue;
- ReentrantLock 用于对于工作集和related bookkeeping(不知道啥意思)的并发访问锁控制。
private final ReentrantLock mainLock = new ReentrantLock();
- 工作线程集,Worker是ThreadPoolExecutor类的内部类,并实现了Runnable接口,用户提交的任务都是给Worker线程执行。
private final HashSet<Worker> workers = new HashSet<>();
- Tracks largest attained pool size. Accessed only under mainLock.
private int largestPoolSize;
private final Condition termination = mainLock.newCondition();
- 完成任务的计数器,当任务结束时更新。
private long completedTaskCount;
- 创建新线程的工厂
private volatile ThreadFactory threadFactory;
- 拒绝策略,详情察看
线程池原理–拒绝策略之RejectedExecutionHandler类
private volatile RejectedExecutionHandler handler;
- 默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
- 当 allowCoreThreadTimeOut设置为true.
如果当前线程池中的线程如果大于corePoolSize,那么如果空闲时间超过keepAliveTime,那么就会销毁掉一些线程。
否则,就一直等到有新的任务执行。
private volatile long keepAliveTime;
- true:核心线程即使空闲也不会被销毁掉。
- false:核心线程空闲超过keepAliveTime定义的超时时间,则会被销毁掉。
private volatile boolean allowCoreThreadTimeOut;
- 线程池核心线程大小
private volatile int corePoolSize;
- 最大的线程池大小 = 核心线程大小 + 非核心线程大小
private volatile int maximumPoolSize;
-
运行时权限
private static final RuntimePermission shutdownPerm =new RuntimePermission("modifyThread");
- 调用finalize会用到
private final AccessControlContext acc;this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();
构造器
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
构造器参数介绍
下面来解释下各个参数:
-
int corePoolSize:该线程池中核心线程数最大值
核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程,核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉。 -
int maximumPoolSize: 该线程池中线程总数最大值
线程总数 = 核心线程数 + 非核心线程数。 -
long keepAliveTime:
当 allowCoreThreadTimeOut设置为true.
如果当前线程池中的线程如果大于corePoolSize,那么如果空闲时间超过keepAliveTime,那么就会销毁掉一些线程。
否则,就一直等到有新的任务执行。 -
TimeUnit unit:keepAliveTime的单位
TimeUnit是一个枚举类型,其包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天 -
BlockingQueue ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue: 一个不存储元素的阻塞队列。
- LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
-
ThreadFactory threadFactory:线程工厂,用于创建线程执行 我们提交的任务。
-
RejectedExecutionHandler handler:这个指定当队列满时继续添加任务该u做如何处理,详细可以看线程池原理–拒绝策略之RejectedExecutionHandler类。
execute()方法
execute()方法用于提交任务。
//提交的Runnable接口的实现类
//实际用户可以提交Runnable接口的实现类或者Callable接口的实现类,AbstractExecutorService会在submit()方法中进行预处理,将Callable类型对象转化为Runnable类型对象
public void execute(Runnable command) {if (command == null)throw new NullPointerException(); *//** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁)
- 如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)
- 如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
- addWorker(Runnable firstTask, boolean core)
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
参数:
firstTask: worker线程的初始任务,可以为空
core: true:将corePoolSize作为上限,false:将maximumPoolSize作为上限
addWorker方法有4种传参的方式:
1、addWorker(command, true)2、addWorker(command, false)3、addWorker(null, false)4、addWorker(null, true)
在execute方法中就使用了前3种,结合这个核心方法进行以下分析
第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false
第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false
第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务
第四个:这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行
这篇关于线程池原理--执行器ThreadPoolExecutor的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!