ThreadPoolExecutor线程池 —————— 开开开山怪

2024-02-07 07:48

本文主要是介绍ThreadPoolExecutor线程池 —————— 开开开山怪,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ThreadPoolExecutor线程池

在这里插入图片描述
说起线程池大家一定都不陌生,其实在很多地方都有应用。
当我们解释为什么使用线程池的时候,我们都会说线程池可以减少线程创建的开销,节省系统的资源。
但其实线程池真正做到我们说的那样,还和我们设置的参数有关。我们都知道在我们使用线程池的时候,开头要搞很多参数。

 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue){}                                                             

这些个参数,相信你们一定不会陌生,在使用的时候还要考虑半天,这参数我该怎么设置好呢?

那我们首先来了解这些个参数的意义,这些参数大多对应的也是线程池中的成员,既然线程池的源码中对于往线程池中添加一个任务采用的是这样 addWorker , Worker, workerCountOf.

看到Woker这样的字眼,我觉得采用工人这个词来可能会更加的清晰。

先说一个误区:在我们new ThreadPoolExecutor()的时候设置核心线程数,不是说我们new 完之后这些个线程就已经存在,而是之后我们向线程池采用execute添加任务(Runnable)时才会创建线程,核心线程的数量只能说明是一个数量上的限制,当超过这个数量就是另当别论了。

其实采用execute添加Runnable任务的时候,是将任务Runnable交给一个Worker,然后启动该worker,进行任务的执行。
所以线程池中的线程,我下面都采用工人(Worker)这个字眼来代替.

1. private volatile int corePoolSize;

例如 corePoolSize = 5
线程池始终保持有5个工人,不管这些个工人是在核心工人数没有到达5这个阀值时创建的工人,还是由于已经有5个工人在执行任务,已经创建的5人都在忙的状态,而此时阻塞队列也满了,线程池会创建新的工人(也即后创建的工人)。

不管是先创建的还是后创建的,不会说等所有工人都没有任务执行的时候保留先创建的5个,而解除后创建的工人这一说。而是始终保持所有创建的工人中的5个。

2. private volatile int maximumPoolSize;
线程池能容纳的最多的工人数量,

3. private volatile long keepAliveTime;
当工人的数量大于核心工人的数量的时候,不管是哪个工人执行完自己手头的任务,去阻塞队列进行取任务的时候,都会等待这个时间,如果阻塞队列中有任务,那就万事大吉,这个工人还能用,如果阻塞队列中迟迟没有任务,那么该工人就采用这个时间等等,时间过了,没有任务,那么该工人就该下岗了。

4. TimeUnit unit 这个不是成员,
例如unit = TimeUnit.MILLISECONDS,是采用毫秒进行计算
就是由用户来设置,对于 keepAliveTime采用怎样的单位进行计算,计算出最终的等待时间,

5. private final BlockingQueue workQueue;
用于存储任务的阻塞队列,当核心数量的工人都在执行任务的时候,再来新的任务,就会添加到这个阻塞队列中。

正儿八经线程池工作原理

情况1 : 当工人的数量没有超过核心工人数量的这个阀值的时候

来一个任务新建一个工人,等待工人去执行,那么此时来说,阻塞队列肯定为空,如果此时没有任何的新任务进来,那么工人执行完自己的任务也不闲着,会去阻塞队列中去看一眼,看有没有任务等待执行,那么此时肯定是没有的,所以当前执行完自己任务的工人就会阻塞起来。

情况2当工人数量达到核心工人数量的时候,但阻塞队列没有满的时候

此时是不会针对新来任务创建工人的,而是将任务加到阻塞队列中,所以针对 情况1 的工人阻塞就可以处理了,因为 情况2 是不会创建新的工人,只是将任务加入到阻塞队列,所以如果在 情况1 由于工人没有任务可执行,工人阻塞起来,那么在此时,由于阻塞队列有任务,就会唤醒那些阻塞起来的工人,阻塞队列中加一个任务,唤醒一个工人。那么工人就有任务可执行了,(这也就是我们为什么说线程池节省系统资源的原因,因为工人没闲着,只要有活就干,相当于一个线程执行完一个任务,线程不会消亡而是在任务队里中再拿任务执行,可以做到线程的循环使用)。

情况3当工人达到核心工人数量的时候,阻塞队列也已满,而且核心数量的这哥几个工人还都手头有任务没有执行完。

此时如果有任务来,线程池就不得不创建工人了,没办法,任务都堆积如山了,那么此时新创建的该工人就执行当前任务。
如果此时由任务来,会出现两种情况:
3.1 :该新建工人还在执行任务,之前的工人也还都在执行任务,阻塞队列还是满的,那么线程池就再创建一个工人执行新任务,如果一直是我说的这种情况,(不管是之前的工人还是现在的工人都在执行任务,并且阻塞队列里的任务还都是满的话),那么对于新任务就创建一个工人,直到工人的数量大于线程池容纳最多工人的数量,此时线程池就不接任务了,同时也会执行拒绝策略 (但是一般这种情况很少出现)。

3.2:新创建的工人或者之前的工人有一个或多个已经将手头的任务执行完了,那么此时阻塞队列就会有空位,因为那些执行完任务的工人会去阻塞队列去拿任务执行,那么此时来的新任务就进入阻塞队列,而不是重新创建工人执行了。

对于上边的 情况3 来说,不管是情况3 下两种情况的哪一种情况,都说明已经建立了新的工人(比核心数量多的工人),顾名思议就是,此时的工人的数量已经超过核心工人数量。

对于 情况3 的第一种情况来说属于极端情况,那当然极端情况就是线程池不再接受新的任务了,就算不接受新的任务,那么线程池也会面临下面的问题。
迟早工人们(工人数量大于核心工人的数量)会完成自己的手头工作,去阻塞队列中拿任务执行,阻塞队列任务也会被拿完的情况(这种情况当然是我们的正常情况,哪有说工人执行任务的速度还赶不上添加任务的速度,那这效率也太慢了).

此时分为两种情况,只要有工人完成手中的任务,去阻塞队列中拿任务,拿到任务执行就完事了,随着所有工人渐渐的完成自己手中任务都挨个去阻塞队列中拿任务的时候,总有人会因为阻塞队列为空而拿不到任务,此时我们说的那个线程池中的变量keepAliveTime派上用场了,如果某个工人等了keepAliveTime时间,阻塞队列还为空,线程池就会让该工人下岗了,但对于这个下岗来说,有一个限制就是corePoolSize,所以这还得看那些工人的命,要是某一个工人去阻塞队列中拿任务,没有拿到,并且此时总的工人数量大于和核心工人数量,就让该工人下岗,如果小于核心工人的数量,那算这个工人命好没有被迫下岗(这也就是我前面说的,corepollSize的作用就是用来维持工人的数量的,没有什么核心工人这一说,也就是核心线程。只是说线程池要留够corepollSize这个数量的工人,至于留谁那看工人的命了,也就是刚才说过的。).
那么现在的局面也就回到了我们 情况2 的局面。

这就是整个线程池的工作原理,也是一个不断轮回的过程,直到线程池关闭,线程池关闭也讲究善后工作,那就是对这些工作的工人和任务的处理了。善后工作这篇文章不做说明,可能会在之后的博文中进行讲解,拒绝策略方面的内容。

对于我上面说的那几种情况,你们可以对照源码多走几遍,就意思自己采用手工过程,验证一下上边的情况。
对于线程池工作的原理,上边就是我看源码总结出来的,下面我们跟着源码再大致看看。

源码解析;

入口方法execute(Runnable)

 public void execute(Runnable command) {if (command == null)//这判断不用多说,你们肯定都晓得,throw new NullPointerException();int c = ctl.get();
//这个ctl是 AtomicInteger这个类的一个对象,这个类中有一个int value的成员,用这个AtomicInteger其实想用value,
//ctl.get()返回的就是value值,同时这个值在我们new ThreadPoolExecutor的时候默认值为-1 << 29
//采用这个值,一是想采用value的前三位表示线程池的状态,后29位表示线程池中工人的个数。if (workerCountOf(c) < corePoolSize) {//workerCountOf(c)就是去除value就是取出value的后29 判断工人数量。//当前的工人的数量还没有到核心工人数量这个阀值的时候//采用addWorker创建一个工人,来执行任务。 //下面会说到这个addworker方法if (addWorker(command, true))//return;c = ctl.get();}//当目前工人的数量大于核心工人数量的时候,直接到这个判断// isRunning(c)通过value判断线程池的状态,是否在运行状态,在运行转状态才有后话//线程池处于运行状态,将新任务直接加入到阻塞队列中      if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//这里会出现一个情况就是,当加入新任务成功之后,此时线程池的状态不再为运行状态,那就意味值线程池要关闭。//那就试图将新添加的这个任务从阻塞队列中移除,如果删除成功,就执行拒绝策略(线程池关闭的善后工作)//如果删除失败,也没关系,反正线程池已经不工作,对于阻塞队列中的任务也会进行善后处理的。if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//当线程池中工人的数量大于核心工人数量,并且,阻塞队列也满的情况下//创建新的工人执行任务,addWorker(command, false)返回true//如果当前总的工人数量大于线程池最大工人的数量,那么此时创建工人就会失败,addWorker(command, false)返回false,则执行拒绝策略。       else if (!addWorker(command, false))reject(command);}

addworker()

private boolean addWorker(Runnable firstTask, boolean core) {retry://这个外层for循环就是用来判断当前是否满足创建工人的条件for (;;) {int c = ctl.get();int rs = runStateOf(c);//rs为当前线程池的状态//如果当前线程池的状态为关闭状态,并且阻塞队列为空的话,就不满足创建一个新工人的条件,//显然只要线程池的状态为Running状态,也就是运行态,当然不会进入到这个判断,也就离创建工人的可能性更进了一步if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {//判断当前线程池中的工人的数量int wc = workerCountOf(c);//对于下面这个判断是针对不同的情况,当大于核心工人数量,返回false,表明当前添加的任务不再创建工人执行,而是加入阻塞队列汇中//当大于最大工人的数量的时候,返回false,那时候说明要执行拒绝策略 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;//程序到这里表明可以创建工人的条件满足,将ctl中的value值加一,if (compareAndIncrementWorkerCount(c))break retry;//直接跳出外层的forc = ctl.get();  if (runStateOf(c) != rs)continue retry;}//外层for循环结束//程序到这里表明创新工人的条件成立,可以创建工人了boolean workerStarted = false;                boolean workerAdded = false;Worker w = null;                       try {//将任务交给工人,并且创建一个工人,Worker类继承了Runnable//将任务交个工人,那么会将firstTask赋值为Worker中的 Runnable firstTask成员。//由于worker本身是一个Runnable所以,在创建工人的时候,工人将自己本身赋值给Worker中的Thread thread成员,//worker中的run方法调用了runWorker(this),               w = new Worker(firstTask);final Thread t = w.thread;//w.thread,就是工人本身。在之后你们看到的他 t.start//就说明工人等待机会执行任务。那么t.start 会有一个线程的开启,并且之后会执行t中的run方法,因为上边说了t就是worker对象本身//所以会执行runworker(this)方法,表明工人开始执行任务了,而this就是worker对象//而在runworker(this)方法中就执行了this中的firsttask的run方法,也就是该woker对象所拥有的任务firstTask的run方法,//也就是真正执行了任务firstTask。if (t != null) {final ReentrantLock mainLock = this.mainLock;//这个锁不用说了,因为线程中有一个变量, HashSet<Worker> workers  //如果工人创建成功的话,需要将工人加入到这个workers中,为了多线程安全的问题,采用mainLock.lock(); 保证在添加工人和删除工人的同步。  mainLock.lock();try {            int rs = runStateOf(ctl.get());//获取当前的线程池的状态,if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){if (t.isAlive())throw new IllegalThreadStateException();//将工人添加到workers中               workers.add(w);int s = workers.size();if (s > largestPoolSize)//largestPoolSize默认为零,添加工人的时候,对其值进行更改记录。largestPoolSize = s;workerAdded = true;}} finally {//释放锁的过程,让其他想要添加或者删除工人的线程可以持有锁进行添加或者删除。mainLock.unlock();}//当工人添加好之后//t.start就是创建一个线程,在上面 new woker的时候已经说过了//等待线程运行的时候,会执行woker中的firstTask的run方法。相当于真正执行任务if (workerAdded) {t.start();//不是立马执行任务,先创键线程,等待cpu调度线程执行任务,我也比作是工人执行任务。workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

runWoker()
runwoker在前面创建工人的时候,已经提到过了。当创建完工人之后有一个t.start 的过程,那么这个 t 就是Woker对象,t.start在之后创建的线程会在之后运行会调用 t 中run方法,也就是Woker中的run方法,也就会运行runwoker()并且参数就是Woker对象

private final class Worker extends AbstractQueuedSynchronizer  implements Runnable{public void run() {runWorker(this);}
}
final void runWorker(Worker w) {Thread wt = Thread.currentThread();//w.firstTask就是woker中的任务,也就是工人携带的任务,//当一个线程运行runworker的时候,就会执行工人携带的任务,也相当于工人执行了任务。Runnable task = w.firstTask;//先将工人携带的任务设置为null,然后执行任务w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//当前线程执行完一个task,会在循环采用getTask在阻塞队列中拿任务执行,//也相当于一个工人执行完任务,再拿任务执行。while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {//在线程中此方法并没有具体的实现,空方法,不必管beforeExecute(wt, task);Throwable thrown = null;try {//执行task.run方法,也是真正执行任务了task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//线程池并没有实现,空方法afterExecute(task, thrown);}} finally {task = null;//当前线程执行完一个任务,也相当于一个工人执行完任务//工人记录自己完成的任务数量。w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//当上边的while循环结束,程序就会运行到这里//让工人下岗的操作,相当于将工人在workers中删除processWorkerExit(w, completedAbruptly);//while循环结束相当于是,在取任务的时候没取到,返回了null//上边的while循环才会结束//下面我们就来看看getTask方法。   }}

getTask();

 private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();int rs = runStateOf(c);//获取线程池的状态   if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//当线程池为关闭状态的时候 先通过decrementWorkerCount();将工人数量减一//然后返回null,那么runwoker中的while循环就会结束,并且删除工人decrementWorkerCount();return null;}// 获取线程池中工人的数量。  int wc = workerCountOf(c);//allowCoreThreadTimeOut默认是false,表示不允许核心数量的工人等待超时而进行下岗。//所以当工人的数量小于核心工人的数量的时候,timed = false//当工人的数量大于核心工人的数量的时候,timed = trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//在这里就是决定工人是否下岗的条件//当此时工人总数量大于核心工人的数量的时候,并且由于工人在阻塞队列中没有取到任何的任务,//那么此时满足这个if的判断,就会将工人的数量减一,并且返回null//那么在runwoker的while循环就会结束,就会让工人下岗if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}//当timed为true的时候,表明此时的工人数量大于核心工人的数量,//此时对于任何工人来说采用poll,原因是如果当阻塞队列中没有任务的时候,这些取任务的工人还有等待的时机//当等待时间到了,还没有获取到任务的话,将返回null,这样timedOut = true;那么在次循环的时候,就会将获取任务的工人删除,//因为当工人数量大于核心工人数量,并且阻塞队列中没有任务,线程池没有必要维持那么多工人,维持核心数量个数的工人就好。//当timed为false的时候表明此时的工人数量小于核心工人的数量,//采用take的原因是,如果此时由工人去阻塞队列中拿任务,但是阻塞对列中没有任务的时候,会将当前线程阻塞,//也就是是说工人只是暂停工作,当阻塞队列中有任务加入的话,当前线程接着运行从阻塞队列中拿任务执行//因为线程池要维持核心数量的工人数,此时工人的数量小于核心工人数量,所以不会像poll一样返回null,然后剔除这个工人,只是将其暂停。try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;}catch (InterruptedException retry) {timedOut = false;}}}

这篇关于ThreadPoolExecutor线程池 —————— 开开开山怪的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/687024

相关文章

线程的四种操作

所属专栏:Java学习        1. 线程的开启 start和run的区别: run:描述了线程要执行的任务,也可以称为线程的入口 start:调用系统函数,真正的在系统内核中创建线程(创建PCB,加入到链表中),此处的start会根据不同的系统,分别调用不同的api,创建好之后的线程,再单独去执行run(所以说,start的本质是调用系统api,系统的api

java线程深度解析(六)——线程池技术

http://blog.csdn.net/Daybreak1209/article/details/51382604 一种最为简单的线程创建和回收的方法: [html]  view plain copy new Thread(new Runnable(){                @Override               public voi

java线程深度解析(五)——并发模型(生产者-消费者)

http://blog.csdn.net/Daybreak1209/article/details/51378055 三、生产者-消费者模式     在经典的多线程模式中,生产者-消费者为多线程间协作提供了良好的解决方案。基本原理是两类线程,即若干个生产者和若干个消费者,生产者负责提交用户请求任务(到内存缓冲区),消费者线程负责处理任务(从内存缓冲区中取任务进行处理),两类线程之

java线程深度解析(四)——并发模型(Master-Worker)

http://blog.csdn.net/daybreak1209/article/details/51372929 二、Master-worker ——分而治之      Master-worker常用的并行模式之一,核心思想是由两个进程协作工作,master负责接收和分配任务,worker负责处理任务,并把处理结果返回给Master进程,由Master进行汇总,返回给客

java线程深度解析(二)——线程互斥技术与线程间通信

http://blog.csdn.net/daybreak1209/article/details/51307679      在java多线程——线程同步问题中,对于多线程下程序启动时出现的线程安全问题的背景和初步解决方案已经有了详细的介绍。本文将再度深入解析对线程代码块和方法的同步控制和多线程间通信的实例。 一、再现多线程下安全问题 先看开启两条线程,分别按序打印字符串的

java线程深度解析(一)——java new 接口?匿名内部类给你答案

http://blog.csdn.net/daybreak1209/article/details/51305477 一、内部类 1、内部类初识 一般,一个类里主要包含类的方法和属性,但在Java中还提出在类中继续定义类(内部类)的概念。 内部类的定义:类的内部定义类 先来看一个实例 [html]  view plain copy pu

C#线程系列(1):BeginInvoke和EndInvoke方法

一、线程概述 在操作系统中一个进程至少要包含一个线程,然后,在某些时候需要在同一个进程中同时执行多项任务,或是为了提供程序的性能,将要执行的任务分解成多个子任务执行。这就需要在同一个进程中开启多个线程。我们使用 C# 编写一个应用程序(控制台或桌面程序都可以),然后运行这个程序,并打开 windows 任务管理器,这时我们就会看到这个应用程序中所含有的线程数,如下图所示。

71-java 导致线程上下文切换的原因

Java中导致线程上下文切换的原因通常包括: 线程时间片用完:当前线程的时间片用完,操作系统将其暂停,并切换到另一个线程。 线程被优先级更高的线程抢占:操作系统根据线程优先级决定运行哪个线程。 线程进入等待状态:如线程执行了sleep(),wait(),join()等操作,使线程进入等待状态或阻塞状态,释放CPU。 线程占用CPU时间过长:如果线程执行了大量的I/O操作,而不是CPU计算

使用条件变量实现线程同步:C++实战指南

使用条件变量实现线程同步:C++实战指南 在多线程编程中,线程同步是确保程序正确性和稳定性的关键。条件变量(condition variable)是一种强大的同步原语,用于在线程之间进行协调,避免数据竞争和死锁。本文将详细介绍如何在C++中使用条件变量实现线程同步,并提供完整的代码示例和详细的解释。 什么是条件变量? 条件变量是一种同步机制,允许线程在某个条件满足之前进入等待状态,并在条件满

pythons强行杀掉线程的方法

使用ctypes强行杀掉线程 import threading import time import inspect import ctypes def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if n