本文主要是介绍源码分析-ThreadPoolExecutor,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
接口
首先说一下几个Executor相关的接口
- Executor:这个接口的主要功能的使是的策略与机制的分离,主要就是可以将提交任务的线程和执行任务的线程分开,异步的进行。当然特殊情况下也是同步进行的。只有一个方法就是
void execute(Runnable command)
- ExecutorService:这个接口功能就要复杂一些了,和Excutor的区别主要在两点:
- 提供了取消和关闭执行器的方法。shutdown方法拒绝继续提交的任务并等待已提交的任务完成,shutdownnow则强行关闭。
- 返回了Future用于监视任务的进行状态。以及invokeAny和invokeAll用于任务组的执行状态监控。
- ScheduledExecutorService用于延时执行或者周期性的执行。
AbstractExecutorService
AbstractExecutorService是实现了基本的ExecutorService的接口。实现了上小节中提到的submit,invokeAny和invokeAll方法至于shutdown,由于不同的执行器有自己的想法所以这里没有进行统一。这里主要看下这个几个方法:
submit方法
首先说明下Callable和Runnable的区别
Callable接口只有一个方法就是call(),是带有返回值的,而且不会抛出已检查异常。
Runnable接口也只有一个方法就是run(),是没有返回值的。
submit有三种方法:
<T> Future<T> submit(Callable<T> task)
Future<?> submit(Runnable task)
<T> Future<T> submit(Runnable task, T result)
三个方法都比较类似这里只看一个就行,调用newTaskFor生成一个RunnableFuture,然后调用execute(ftask)执行任务,然后返回RunnableFuture,newTaskFor很简单就是调用了一下FutureTask的构造器,等到看FuturTask的源码的时候在详细说。
public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}
InvokeAny
invokeAny意思是当有一组任务提交后,只要有任意任务完成就返回结果。在AbstractExecutorService,invokeAny有两个版本的参数,区别在于是否使用了时限功能。但本质上来说都是通过一个内部的方法doInvokeAny
实现的这里仅仅看一下这个doInvokeAny
:
doInvokeAny思路上比较简单就是把任务委托给了一个ExecutorCompletionService。ExecutorCompletionService有一个队列里面放着的是按照完成顺序排列的Future。然后在一个循环内不断尝试读取这个队列到一个Future引用,然后根据Future是否为null来分情况讨论。等到返回或者跳出循环的时候尝试取消其他任务。
此外这段程序为了预防所有任务都无法完成还记录了总任务数和已经激活的任务数,如果所有任务都没有完成则抛出异常。
至于ExecutorCompletionService是如何实现先返回执行完成的任务的呢。这里先简单说一下,对于ExecutorCompletionService的submit方法会先调用一个Future的构造器构造一个Future,然后调用excute,然后返回这个future。
关键的地方在于这个构造的Future是一个FutureTask的子类,FutureTask中有一个done方法是当FutureTask状态改变的时候(一般就是任务完成或者取消的时候)执行,在ExecutorCompletionService中重写了done方法,在done方法中会将当前的FutureTask添加到队列中。
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {if (tasks == null)throw new NullPointerException();int ntasks = tasks.size();//表示当前任务数量if (ntasks == 0)throw new IllegalArgumentException();List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);//这个队列放的是已经激活的任务,当有任务返回或者所有任务都结束了的时候,在finally里将所有未完成任务取消已节省资源。ExecutorCompletionService<T> ecs =new ExecutorCompletionService<T>(this);try {// Record exceptions so that if we fail to obtain any// result, we can throw the last exception we got.ExecutionException ee = null;long lastTime = timed ? System.nanoTime() : 0;Iterator<? extends Callable<T>> it = tasks.iterator();// Start one task for sure; the rest incrementallyfutures.add(ecs.submit(it.next()));--ntasks;int active = 1;//active表示的是已经激活的任务数量for (;;) {Future<T> f = ecs.poll();//尝试非阻塞的获得ecs的Futureif (f == null) {//future为空说明没有任务结束,if (ntasks > 0) {//ntasks大于0说明还有任务没提交--ntasks;futures.add(ecs.submit(it.next()));++active;}else if (active == 0)//正常情况下active不会为0的。我理解还不是太好。目前来看我觉得应如果当所有任务都无法完成用来跳出循环。也就是虽然返回了Future,但是无法通过Future获得结果,而是抛出了异常。break;else if (timed) {//对于限时任务需要有记录时间以抛出异常f = ecs.poll(nanos, TimeUnit.NANOSECONDS);if (f == null)throw new TimeoutException();long now = System.nanoTime();nanos -= now - lastTime;lastTime = now;}else//如果没有实现且所有任务都已提交且还有任务没有完成则使用阻塞方法。这也不浪费CPU时间。f = ecs.take();}if (f != null) {//如果得到了future,--active;//减少一个已激活的程序try {return f.get();} catch (ExecutionException eex) {ee = eex;} catch (RuntimeException rex) {ee = new ExecutionException(rex);}}}if (ee == null)ee = new ExecutionException();throw ee;} finally {//取消所有未完成任务。for (Future<T> f : futures)f.cancel(true);}}
invokeAll
invokeAll相对来说就简单很多,就是用一个Futures队列来维护所有的任务,然后使用阻塞方法遍历invokeAll并且了所有的异常,如果任务未完成则将剩余任务取消。
ThreadPoolExecutor
域
ThreadPoolExcutor的域的内容比较多,这里主要分成两大类,一类是内部结构类,一类是用户参数控制类:
private final AtomicInteger ctl
ctl域表示的使main pool control state。使一个AtomicInteger,是用来表示两个量:
- workerCount:表示有效的线程数量。
- runState:表示线程的状态
ctl使用32位中的前3位来表示runState,使用后29位来表示workerCount。
对于runState有5种状态。分别用-1,0,1,2,3来表示:runState的状态只能递增不能递减。这5种状态分别是:
- RUNNING:接受了一个新任务,并且排队处理任务
- SHUTDOWN:不接受新任务,但是正在处理队列中的任务
- STOP:不接受新任务,不处理任务,中断正在处理的任务
- TIDYING:所有任务都已经终止,WorkerCount是0,thread将状态转换到TIDYING状态,并且运行terminated挂钩方法。
- TERMINATED:terminated方法运行完毕。
5种状态之间的转换有一定限制不是任意状态都会转换的。转换关系如下图:
这张图其实并不准确,但是不知道为什么源码中只给出这些状态,比如说SHUTDOWN可以转移到TIDYING(从tryterminate方法看出来),不过从目前来看我也只找到一种特例
有一系列操作这些内容的方法,内容也比较简单。
private final BlockingQueue<Runnable> workQueue
这个队列用于保存任务,并且将任务交付给worker线程。注意不要使用workQueue.poll()==null
来判断队列是否为空,因为如果是在一些特殊应用中,比如使用了DelayQueues,也许是因为没有到达时限才返回null的。
private final ReentrantLock mainLock
这个锁用来控制workers集的访问和相关的记录权限。这里为何不使用一个并发集合,而是用一个锁?
- 为了串行的执行interruptIdleWorkers,避免在interruptIdleWorkers过程中引起中断风暴(比如在shutdown期间),使得正在退出的线程中断了尚未退出的线程。
- 为了简化在大的线程池的管理。
在shutdown和shutdownNow中持有锁,以便单独检查workers各个线程的中断。
其他域:
private final HashSet<Worker> workers
private final Condition termination
private int largestPoolSize;
private long completedTaskCount;
用户参数控制域
大部分的用户参数控制域都使用 volatile标识符:
private volatile ThreadFactory threadFactory
所有的线程都是由这个线程工厂生产的,如果要增加worker线程只需要调用addWoker就好。
需要注意的使addworker可能由于资源限制而无法产生新的线程,这时候可能会导致拒绝任务或者将其滞留在队列里。
其他域
private volatile RejectedExecutionHandler handler;//对于饱和拒绝的处理器
private volatile long keepAliveTime;//对于空闲线程的等待时间
private volatile boolean allowCoreThreadTimeOut;//是否允许核心线程超时
private volatile int corePoolSize;//worker的最小线程数(allowCoreThreadTimeOut不为ture)
private volatile int maximumPoolSize;//最大允许线程数
静态域
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
这部分的内容涉及的内容比较多。暂时先略过。
其他静态域:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
Worker
Worker是继承自队列同步器,因此在说明Worker之前有必要简单说明一下队列同步器。
通过AbstractQueuedSynchronizer你可以自定义一些锁或者是其他的同步组件。具体的实现过程等到AbstractQueuedSynchronizer的源码分析的时候在详细说明。这里仅仅说明一下如何使用AbstractQueuedSynchronizer。
首先,如何组织AbstractQueuedSynchronizer,AbstractQueuedSynchronizer是一个抽象类,在使用的时候先用一个类继承自AbstractQueuedSynchronizer,然后使用组合的方式使这个类变成新的同步组件类的一个域,对新的同步组件类的lock或者release方法委托给AbstractQueuedSynchronizer子类进行操作。
其次,如何设计AbstractQueuedSynchronizer的子类,首先AbstractQueuedSynchronizer有5个protect的方法,具体 如下
tryAcquire(int)
tryRelease(int)
tryAcquireShared(int)
tryReleaseShared(int)
isHeldExclusively()
默认情况下调用会抛出异常,在使用的时候只需要重写这几个方法就好了。但是再重写的时候需要注意两点,第一点,对于状态的设置不要自己随意设置,建议使用AbstractQueuedSynchronizer的getState()
, setState(int)
和 compareAndSetState(int, int)
来原子性的更新状态。其次这几个方法是非阻塞的方法不能产生任何阻塞操作,并且竟可能的快速完成这部分操作。
当然在Worker中并没有使用委托的方法而是直接继承了。这样也是可以的只是占据了宝贵的继承位置。
Worker的域:
final Thread thread;//Worker包装的线程Runnable firstTask;//装入的任务volatile long completedTasks;//完成的任务数量
Worker的实现比较简单,0状态表示解锁状态,1状态表示加锁状态,实现都是队列同步器的标准实现这里就不说明了。这里只有一个方法interruptIfStartd方法也就是只在运行任务的时候(状态为1的时候才可以中断否则不中断)
这里还忽略了SecurityException 异常。(这部分内容不太清楚先往后看。)
void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
设置控制状态方法
advanceRunState
private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();if (runStateAtLeast(c, targetState) ||//如果ctl大于targetStat则设置无效返回。ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))//设置ctl至少为targetState,ctlOf的作用是位与break;}}
自旋设置ctl,直到ctl状态大于等于targetState。
tryTerminate
在之前的ctl中我们曾经说过ThreadPoolExecutor的状态转移。其中可以看到shutdown和shutdownNow是可以控制前三个状态的,而如果想要转换到后2种状态并不是通过用户调用来实现的。而是程序自动实现的,那如何来自动实现?从实现思路上来说,就是在当有可能出现terminated的情况下(比如Worker减少,或者任务取消)的情况下都调用tryTerminate,然后在tryTerminate中再判断是否满足条件((shutdown状态并且任务池和线程池为空)或者(STOP状态并且线程池为空))应该进行terminate。
final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||//在running状态及时有Worker减少或者任务取消也不应该terminate的所以退出,需要注意的是当RunState小于shutdown的时候isRunning(c)都返回false;runStateAtLeast(c, TIDYING) ||//如果当前状态为TIDYING以上,则说明有其他线程执行tryTerminate并且成功terminate,现在的线程不应该再terminate,而是退出(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//如果当前的线程为shutdown并且任务队列还没有处理完,则应该等到任务处理完的情况下再terminate,所以这里也应该退出return;if (workerCountOf(c) != 0) { // Eligible to terminate//这里还有任务没有结束,则应该尝试先取消等待的空闲线程然后在terminate所以调用interruptIdleWorker。interruptIdleWorkers(ONLY_ONE);//这里ONLY_ONE表示只移除一个Worker线程。return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//设置TIDYING状态,并使得任务数量为0。try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}
这里还有一个interruptIdleWorkers(ONLY_ONE)
的小段程序需要说明一下。这段程序的调用的使interruptIdleWorkers(boolean false)
括号内的boolean表示是否只中断一个线程。
正常情况下都为false。并且这个interruptIdleWorkers(boolean)
还有一个interruptIdleWorkers()
的版本作为interruptIdleWorkers(false)
的别称。
而ONLY_ONE为 true也是,而tryTerminate也是唯一一个有可能调用interruptIdleWorkers(ONLY_ONE)
的程序。这里程序的意义理解的不是很清晰,先尝试解释一下。
按照源码中的说法大概是这样的意思。如果调用tryTerminate的时候既有worker线程正在工作,又有worker线程正在等待线程完成工作。这时候如果要等到所有线程都完成工作显得比较傻。所以这时候就尽量将可以中断的空闲Worker中断掉,然后退出就好。
这样的好处是什么?
首先,什么样的情况下会运行这句话?
应该注意当程序运行到这一步的时候ctl的状态必然处于(shutdown并且worker为0)或者(stop)的状态。而这时候worker不为0则只可能是stop状态。而所谓的stop状态通常也就是调用shutdownNow的结果。也就是说已经把任务队列清空了。这时候需要将状态转换到TIDYING,也就是需要将worker的count变为0。
那现在来看看基本的程序逻辑。调用了shutdownNow,然后使得ctl处在STOP状态下,然后清空任务队列。然后又将空闲的线程中断。这样做会发生什么?
那剩下正在工作的worker线程会怎么样。在后面会看到当一个worker线程完成工作中会调用getTask来获得新的任务去执行,而在getTask程序中如果ctl处于STOP状态,而且任务队列为空则会降低worker的数量。并且返回null。这时候调用runWorker程序则会退出循环。运行processWorkerExit
,而processWorkerExit
则会再次调用tryTerminate,这样循环下去终会发生一种情况就是workercount变为0了。
如此一来程序最终会运行到tryTerminate
程序的后半段,也就是设置TIDYING状态,然后调用terminate函数,然后进入TERMINATE状态。
以上的过程也就是所谓的优雅退出。还是有点麻烦的。
这里有一点需要注意。在说到ctl中的workerCount的时候我经常有一个错觉就是WorkerCount就是worker的数量也是就Worker集合的大小,而实际上从这里来看似乎并不是这样。其实在源码中说明的也是很详细了但是我没有注意,workerCount实际上是有效线程的大小。也就是说如果一个线程如果中断了。那就不能算是有效线程了。当然一个中断的线程依然是占有资源的。那么我现在的一个问题就是一个ThreadPoolExecutor采用何种的策略去维护这线程占有的资源?我觉得这是一个很有意思的话题。这里留个坑,等我把主要的程序都看完了再去分析一下。
interruptIdleWorkers
既然上文说到这个问题这里还是来看一下interruptIdleWorkers。当然这个程序本身比较简单就是上锁后使用迭代器依次判断是非有程序可以中断,如果有可以中断的则中断它。如果onlyOne是true则中断一次后就退出。如果onlyOne是false则 尝试中断所有程序。
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}
当然这个程序是用来中断空闲程序了。还有一个程序就不这么客气了。是用来中断所有已经开始的程序的
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}
当然在之前说过,interruptIfStarted会忽略所有安全检查异常。
创建、运行和清理Worker的程序
addWorker
首先,addWorker需要判断在何种情况下才能创建Worker线程。通过返回的boolean来判断是否创建成功。
当pool状态是stop以上状态都不能创建,shutdown状态通常也不行,而且还需要满足Worker线程的数量限制,以及factory请求资源足够,并且创建的线程有资源去运行才行。
如果创建失败、或者factory缺乏足够的资源、或者抛出了异常之类的情况都需要进行滚回清理。
此外输入参数firstTask,通常是worker需要运行的第一个任务,但是firstTask也有可能为null。
创建的worker会有多种类型这里简单总结一下:
- 当worker数量小于corePoolsize的时候或者任务队列满的时候,需要创建一个带有初始任务firstTask任务的Worker,这个worker会先绕过任务队列。独自创建初始任务这时候firstTask不为null
- 有时候也会创建一个firstTask为null的线程,一般在两种情况下一种是需要预先创建coreThread要求的情况下,或者用来替代其他空闲的worker线程。
另外还需要说明一下语句标签。这里也是我第一次看到有这样的用法。可以算是java版本的goto语句。用法可以参看这篇博客java中的goto实现
简单的说就是语句标签需要和循环语句协同使用,主要用来跳出多层循环,且只能在break和continue之后跳转。不过一般也用不到。
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&//如果状态大于SHUTDOWN则无法创建! (rs == SHUTDOWN &&//在shutdown情况下如果队列为空则不可以创建无初始任务的线程。好像没看错吧。。这里逻辑太绕了。firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||//如果worker线程超过限制则无法创建wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))//如果成功增加WorkerCount就跳出这个retry部分的循环。这也是唯一的出口。break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)//不一致错误重新尝试,因为可能在内层循环的过程中出现不一致问题。如果不及时判断一致进行内循环则会无限循环下去。continue retry;// else CAS failed due to workerCount change; retry inner loop}}
//进行到这一步则说明已经成功验证可以增加worker并成功增加workercount。剩下的任务就是新建一个worker
并增加到集合中,这部分比较常规不在多解释boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {final ReentrantLock mainLock = this.mainLock;w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (rs < SHUTDOWN ||//在running状态下是可以正常增加线程的(rs == SHUTDOWN && firstTask == null)) {//在shutdown状态下只能增加空线程,而不可以增加绕过任务队列的无任务线程。原因参考shutdown状态的意义。if (t.isAlive()) // precheck that t is startable//检查新建线程是否有效。throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);//清理工作}return workerStarted;}
清理工作比较简单,就是三部分:
- 如果worker在集合内,则移除它
- 降低WorkerCount
- 调用tryterminate。
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);decrementWorkerCount();tryTerminate();} finally {mainLock.unlock();}}
runWorker的运行
这个程序用来不断的获取任务并且让Worker线程运行它,也会用来处理一些特殊的情况:
- 通常Worker始于一个firstTask任务,如果FirstTask为null 则需要调用getTask来从任务队列获得任务。然后让worker运行它。如此循环。如果getTask返回null则说明worker应该退出,通常是用于运行状态变化或参数变化。当然抛出异常也会导致worker退出。这时会根据completedAbruptly来判断是否需要使用processWorkerExit来取代当前线程
- 在运行任务之前需要对Worker上锁,(work继承自队列同步器可以上锁的),张锁定目的是防止在任务执行期间,线程池内的其他中断影响Worker的任务执行。(clearInterruptsForTaskRun用来保证在状态变为stop之前,这个worker线程不会被中断所影响。这句话不是很理解);
- 在执行任务之前都会调用一个beforeExecute。在beforeExecute中可能会抛出一些异常使得在任务尚未执行之前就终止的程序。(终止循环并且completedAbruptly为true)
- 假设beforeExecute正常完成。我们将会运行任务,并且搜集任务可能抛出的异常,并将其发送到afterExecute,对于RuntimeException和Error和Throwable会分别处理,当然任意异常也会导致线程死亡。
- 任务运行完成之后会调用afterExecute,当然这个afterExecute也会抛出异常,这样的异常也会导致线程死亡。
首先,在看runWorker的运行机制之前,先考虑下载什么情况下Worker线程会中断。只有2种情况下是会中断Worker线程的。一是在stop状态下中断,也就是调用了shutdownNow方法或者从shutdown状态下当所有线程都结束任务后才进入stop状态。除此之外还有一种情况也会中断线程,也就是当前线程属于核心线程边界内,而任务又被处理完了,则需要保留一定数量的核心线程。
其次,Worker中的锁是用来控制什么的。其实到目前为止我都不太明白这个Worker的锁是用来锁住何种操作的,从目前来看的话锁似乎是空来控制Worker工作状态的,简单的说就是先判断是否需要中断,如果需要中断则中断,如果不需要中断则运行当前的线程去执行任务。在runWorker中表现的比较明显,下面的程序会看到。
最后,还有一点是java中间中断的使用,java的中断一般使用Thread对象的interrupt方法(stop太暴力,已经被遗弃了),但是java的中断和C中的中断不太相同,java有时候中断并不会被响应,具体的可以参考api中的介绍。但是通常来说只有在阻塞的时候才会响应中断。所以响应中断一般有2种方式,一种就是当程序运行到阻塞或者yield之类的地方的时候,这时如果中断就会抛出中断异常,第二种就是使用Thread对象的isinterrupt或者interrupted方法。其中interrupted会清除中断标志,因此也可以作为一种主动的中断处理方式。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;//需要运行的任务使用一个task来指向,而不使用Worker中的参数来指向,因为在运行完任务后还需要不停的通过getTask来获取新的任务。w.firstTask = null;w.unlock(); // allow interrupts//打开允许中断的锁。这里指的允许中断实际上指的是允许其他线程去争抢线程。boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {//getTask会获得将要执行的任务w.lock();//获得锁也就代表着获得线程的运行权利,// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||//下面这四行判断语句用来表示当线程需要中断且没有中断的时候用来中断当前的线程,应该注意的是wt获取的是当前的线程而当前的线程实际上就是工作的线程。此外还有1点需要注意,这里为什么要判断Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))这句话。典型的情况是先前有核心线程运行完任务后保留下来,处于空闲的状态,而当有新的线程开始工作开始工作的时候有可能又处于stop状态。所以需要将中断标志位清零,然后在判断是否处于STOP状态如果是则需要再次中断。而这里还有一个功能就对于空闲线程这样做会清理掉其中断标志从而继续运行。(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);//beforeExecuto和afterExcute都是protected的hook方法。Throwable thrown = null;try {task.run();//对于运行期间产生的所有异常都需要抛出。对于无法自行抛出的则包装成Error抛出。} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;//这里即使是运行异常的任务也应该使得completedTasks增加w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);//清理工作}}
这里开始先看下getTask:
getTask会阻塞式或者带时间限的获得任务,取决于当前的配置,或者在下列情况发生时返回null:
- 超过线程最大线程数量限制
- 线程池stop
- 线程池shutdown并且队列为null
- 获取线程超时。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);
//线程池stop、线程池shutdown并且队列为空,则返回null// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed; // Are workers subject to culling?//这里的timed应该表示过时更为合适。而不是时间限。for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果允许核心线程超时或者不允许核心线程撤销,但是当前线程超过核心线程if (wc <= maximumPoolSize && ! (timedOut && timed))//线程不过时,而且线程可以增长,则break,说明可以正常获取任务。break;//break的是内层循环。也就是通过检测可以获取线程。if (compareAndDecrementWorkerCount(c))return null;//未通过检测不可增加线程。如果未能降低Worker则需要重试,不停的检测直到成功降低。这里为何不使用decrementWorkerCount,因为可能出现不一致读的情况。c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)//不一致状态,需要重读。continue retry;// else CAS failed due to workerCount change; retry inner loop}try {Runnable r = timed ?//按要求获取任务。workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;//这里的timedout是用来保存最后一次获取是否超时的。这里我比较好奇的是为什么不直接返回r,可能的原因是java语言的限制,也就是语句标签需要配合循环使用而不能单独使用,所以这里通过一个标志位timeOut来在下一次循环中进行返回。有没有其他原因暂时想不太明白。} catch (InterruptedException retry) {timedOut = false;}}}
processWorkerExit
执行清理工作,并且记录空闲线程。只从工作线程调用。
completedAbruptly用来表示workerCount是否已经降低。通常由于用户的任务执行过程中抛出了异常而退出。
这个方法的主要作用是为了从workerSet中移除线程。并且有可能导致线程池或者替换当前线程(由于用户任务的过程中抛出异常,或者,任务队列中海油任务,但是没有Worker线程。)
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();
//如果是由于任务线程抛出异常导致的退出,则需要降低workerCount;final ReentrantLock mainLock = this.mainLock;mainLock.lock();//workerSet集合锁try {completedTaskCount += w.completedTasks;workers.remove(w);//删除线程w} finally {mainLock.unlock();}tryTerminate();//尝试终止。int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {//如果线程不是由于任务抛出,而是由于没有足够任务需要而退出的,则继续根据条件判断是否需要增加替换线程int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);//如果线程不是Stop状态,且由于用户任务抛出异常则需要补充替换线程。当然如果不是由于用户任务抛出异常导致的退出,也需要根据情况来判断是否需要增加替换线程。}//如果处于pool处于线程池stop以上状态不可能增加线程则不需要考虑增加线程。}
公共方法
公共方法比较多,但是基本上也比较简单。没有复杂逻辑的地方。这里就只看一个比较重要的execute方法。
基本上分三步
第一步:如果线程数少于核心线程,则创建一个新的自带firstTask的worker。当然addWorker有可能失败所以需要根据其情况进一步判断,如果成功则退出,如果失败则需要重新读取ctl。
第二步:如果任务可以顺利进入队列则还需要判断是否需要增加新的worker线程。这里还需要再次进行检查pool是否处于运行状态,如果不是则需要移除任务,如果workCountof等于0(由于任务调用失败而导致线程移除)则需增加worker线程。
第三步:如果无法让任务进入任务队列,则尝试新增一个worker线程,如果依然失败则拒绝任务。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
这篇关于源码分析-ThreadPoolExecutor的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!