Java 线程知识笔记 (三) Executor与ThreadPool 其一

2024-01-10 09:58

本文主要是介绍Java 线程知识笔记 (三) Executor与ThreadPool 其一,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

到目前为止,基本上已经把Java中线程有关的基础知识说了个大概。考虑到开启线程是一个开销很大的事情,如果每个子线程都在使用的时候做初始化等待分配等等一系列的事情,就会非常的影响程序的响应速度。为了解决这个问题,Java也是从1.5开始提出了线程池(ThreadPool)的概念。本篇就会着重对线程池进行一个讲解。更多线程知识内容请点击【Java 多线程和锁知识笔记系列】

效率的比较

前面已经说了,启动线程、分配CPU时间片、销毁是一个很耗资源的工作。为了给大家一个直观的概念,我们用两个几乎一样的方法做个例子:方法一使用构建一万个子线程的方式去往list里写入10000条数据;方法二则是使用构建一个线程池,使用线程池往里面list里写入10000条数据。我们可以对比它们的执行时间。

public class ThreadPool1 {//普通线程public static void threads() throws InterruptedException{Long start=System.currentTimeMillis();List<Integer> list=new ArrayList<>();Random random=new Random();for (int i = 0; i <10000 ; i++) {Thread t=new Thread(()->list.add(random.nextInt()));t.start();t.join();}System.out.println("耗时"+ (System.currentTimeMillis()-start));System.out.println(list.size());}//线程池public static void executors() throws InterruptedException {Long start=System.currentTimeMillis();List<Integer> list=new ArrayList<>();Random random=new Random();ExecutorService service= Executors.newSingleThreadExecutor();for (int i = 0; i <10000 ; i++) {service.execute(() -> list.add(random.nextInt()));}service.shutdown();service.awaitTermination(1000, TimeUnit.HOURS);System.out.println("耗时"+ (System.currentTimeMillis()-start));System.out.println(list.size());}public static void main(String[] args) throws InterruptedException {System.out.println("普通线程执行效率:");threads();System.out.println("线程池执行效率:");executors();}
}
运行输出:
普通线程执行效率:
耗时4977
10000
线程池执行效率:
耗时54
10000

可以看到两个方法做同样的事情所消耗的时间几乎差了100倍,这个效率的对比还是相当恐怖的,但这个例子也说明了线程越多效率未必越高。之前我们说过线程的执行必须被CPU分配时间分片才能进行。由于我们使用多线程,当频繁的创建销毁线程的时候,会有上下文的切换,而这种切换会浪费很多资源。我们在第一篇的时候分析了当使用join()的时候会阻塞直到当前线程跑完。那么就意味着我们threads()方法中启动了10000个线程,并且这些线程必须一个个等待前面的线程释放资源。而executors()中,其实只启动了一个线程,循环了10000次,效率高下立判。

例子中第二个方法没有用线程池用的是newSingleThreadExecutor()。这是因为如果这个例子直接使用newCachedThreadPool()去做,就难以保证数据的一致性。但是可以理解为newSingleThreadExecutor()使用的是单线程的线程池,因为其底层也是用的线程池ThreadPoolExecutor。不管如何这两个方法返回的都是ExecutorService对象,而使用的都是ThreadPoolExecutor类去创建,那么后面的重点就在这两个东西上了。

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

ThreadPoolExecutor类

研究一个类能做什么首先要看到它的继承关系,然后是它的构造方法。首先我们先看下继承关系。

在这里插入图片描述

继承关系很简单,一条直线下来。顶层是Executor接口,这个接口也是整个线程池体系里的最顶层接口,其中只有一个方法execute(),在线程池相关的各种类里面普遍使用。ExecutorService也是一个接口,这个接口扩展了Executor,其中使用submit()方法把execute()做了一个包装,使得execute()执行以后能够具有返回值,它的实现方法在AbstractExecutorService中,AbstractExecutorService抽象类对ExecutorService接口做了一个简单的实现。之所以这样说是因为线程池体系很大,由于ThreadPoolExecutor继承了AbstractExecutorService,因此它也可以调用submit()方法,submit()有3个不同的重载,下面贴出来其中一个。只是想要说明其实submit()execute()几乎是等同的。

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}

ThreadPoolExecutor初始化

简单分析完继承关系,接着看构造方法,这个类一共有四个构造方法,但是最终都会调用到下面这个里面。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize; //线程池初始化创建的线程数this.maximumPoolSize = maximumPoolSize; //线程池允许同时运行的最大线程数,一旦核心线程满了,且短时间内无法释放,就会在这个范围内分配新线程this.workQueue = workQueue; //线程池的阻塞队列,用于存放超出核心线程数的线程this.keepAliveTime = unit.toNanos(keepAliveTime); //空闲线程的存活时间,给非核心池线程用的this.threadFactory = threadFactory; //工厂this.handler = handler; //错误handler
}

构造方法里面一共有7个参数,他们的功能如下:

  • corePoolSize :核心线程数,池中所保存的线程数,包括空闲线程。
  • maximumPoolSize:池中允许的最大线程数。
  • keepAliveTime: 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
  • unit:keepAliveTime 参数的时间单位。
  • workQueue :执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
  • threadFactory:执行程序创建新线程时使用的工厂。
  • handler :当队列和最大线程池都满了之后的饱和策略。

ThreadPoolExecutor#execute方法

初始化做完,就要开始执行execute()方法了。

public void execute(Runnable command) {if (command == null)  //任务是否为nullthrow new NullPointerException(); //为null抛异常/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();  //获取当前线程的状态码if (workerCountOf(c) < corePoolSize) { //如果小于核心数,说明池中还有核心线程if (addWorker(command, true)) //直接调用addWorker()执行return;c = ctl.get(); //如果无法直接执行,更新状态码}if (isRunning(c) && workQueue.offer(command)) {//大于核心数,所有核心处于运行状态,放进队列int recheck = ctl.get(); //重新获取状态码if (! isRunning(recheck) && remove(command))// 再次验证,发现线程池故障了reject(command); //抛出异常,拒绝执行else if (workerCountOf(recheck) == 0)// 重新检查发现核心线程已经有空余了addWorker(null, false); //起一个新线程}else if (!addWorker(command, false)) //没法放进队列,那就直接执行,在非核心线程池中执行reject(command);//都不可以那就拒绝执行(抛异常)
}

进入以后首先判空,然后获取当前线程的状态。官方注释中execute()方法的执行一共分为三种情况。

情况策略
线程数<核心数如果当前运行的线程数小于池的核心数量,尝试给过来的线程创建一个线程作为它的第一个任务。调用addWorker()方法原子性的检查runState和workerCount,并且因此防止不应该添加现成的false告警返回false。
线程数>核心数如果大于或者等于线程池的核心数量,而且可以被放到线程队列里。会首先经过一个double-check,这一步是为了检查是否应该把这个线程添加到队列里(因为上次检查后线程池死了)或者进入这个方法后线程池被关闭了。所以需要recheck状态,并且如果发现任务已经停了回滚队列,或者开启一个新的新城,如果没有一个活着。
线程数>核心数,且无法放到线程队列里如果没有办法把这个任务放到线程队列里,那就直接在非核心池中开启新的线程执行。如果都失败了,可以说线程池死了或者饱和了,那么就抛出异常拒绝执行这个任务,开始执行异常处理。

ThreadPoolExecutor#addWork方法

从上面的分析来看,最终addWorker ()就是执行线程池中线程的方法,那么直接进去里面。

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get(); //获取当前状态码int rs = runStateOf(c); //转换为runStateu运行状态// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&   //检测运行状态! (rs == SHUTDOWN &&  //检测运行状态为SHUTDOWNfirstTask == null &&   //检查传入的当前任务是null! workQueue.isEmpty()))  //检查队列不为null// 如果进入这里说明任务、线程池或者队列有问题,不能执行当前任务return false;for (;;) {int wc = workerCountOf(c); //转化为工作数量if (wc >= CAPACITY || //判断工作数量是不是满足条件wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c)) //修改状态码break retry; //跳出空转c = ctl.get();  // Re-read ctl //修改失败重新读取状态码if (runStateOf(c) != rs) //如果修改失败continue retry;//继续空转// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;/**下面是执行,后面说**/
}

首先进来发现这是一个for的空转自旋,这一部分是为了做验证用的。首先还是拿到状态rs,然后经过一个状态校验看看线程池是不是死了,如果条件都满足,就返回false说任务干不了。然后又一个自旋用于判断当前正在工作数量和线程池的参数是不是满足,如果满足CAS更新线程数,修改成功跳出执行后面的逻辑,修改失败继续空转再判断一次。

private boolean addWorker(Runnable firstTask, boolean core) {/**接上**/boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask); //转换为Worker对象wfinal Thread t = w.thread; //通过Worker对象创建线程if (t != null) { //判空,这个线程就是执行线程final ReentrantLock mainLock = this.mainLock;mainLock.lock(); //加锁,因为workers不是线程安全的所以要加锁try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w); //添加w到一个set,作用仅仅为了统计个数int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

通过验证的任务,就到了执行的地方了。首先把任务转为Worker对象,然后利用Worker对象构建线程对象t,经过再次验证线程池依然健在以后,利用t.start()去调取执行方法。

ThreadPoolExecutor.Worker内部类

这里的Worker类是ThreadPoolExecutor的内部类实现了Runnable接口,因此Worker里面也有一个run()方法,就是在此时调用的。此外这个内部类还继承了一个AbstractQueuedSynchronizer类,看名字大概知道是一个队列同步器。其实这是一个用锁实现AQS,同步阻塞器,以后有机会再说。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{/**略**//** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}/**略**/
}

进入以后又调用了runWorker(this)方法。

final void runWorker(Worker w) {Thread wt = Thread.currentThread(); //获取线程池线程Runnable task = w.firstTask;	//拿到当前任务w.firstTask = null;	//拿到后销毁w.unlock(); // allow interrupts	boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// 验证,如果线程池死了,打断运行if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//验证通过往下走beforeExecute(wt, task); //空方法,后续可能增强Throwable thrown = null;try {task.run(); //执行任务逻辑中的run()方法 } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); //空方法,后续可能增强}} finally {task = null;//销毁执行成功的任务w.completedTasks++;//完成数更新w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

进入后,首先获取到当前线程,后面用来判断当前的线程池不是还活着。然后拿到执行任务task = w.firstTask,验证线程池存活通过以后,直接执行task.run(),这里就是我们创建的线程中run()方法的逻辑调用执行的地方。还有一个值得注意的是while循环中的条件,当task!=null成立时,直接就执行。如果task!=null不成立,则taskgetTask()里面获取到值。从哪里取呢?从线程队列Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(),就是从execute()方法里说到的workQueue里面取。

上面说到,当看到有调用submit()方法时,可以看作调用的就是execute()方法。但是这俩方法还有些不同的:submit()方法是有返回值的是Future,而execute()方法没有返回值,类似于RunnableCallable的区别。当外部调用execute()方法的时候,这里调用task.run()中的taskThreadPool1对象。但是如果换到调用submit()方法的时候,这里的task就会是FutureTask对象去调用run()。其原因已经在上面submit()方法里显示的很清楚了,可以参考博客【Java 线程知识笔记 (二) Callable与Future】里面的FutureTask部分内容。

ThreadPoolExecutor#processWorkerExit收尾

所有上述的内容都执行无误,就进入了processWorkerExit()执行线程退出的过程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}

既然是在finally块中,无论如何都会执行到这里。这部分代码就是在判断线程在初始化的时候的存活时间,是否需要remove,是否因为某些原因需要重新执行addWorker()等等内容进行一个收尾的工作。到这一步一个线程在整个线程池中的动作就抛完了。

总结

本篇从一个例子开始梳理了一个线程在线程池里是如何运行的,以及线程池对于线程的执行策略,最后画张图进行一个总结。下一篇【Java 线程知识笔记 (四) Executor与ThreadPool 其二】会对本篇中没有涉及到的一些琐碎内容,比如线程池中状态的变化等等进行一个集中的讲解。
在这里插入图片描述

这篇关于Java 线程知识笔记 (三) Executor与ThreadPool 其一的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/590406

相关文章

Spring Boot项目部署命令java -jar的各种参数及作用详解

《SpringBoot项目部署命令java-jar的各种参数及作用详解》:本文主要介绍SpringBoot项目部署命令java-jar的各种参数及作用的相关资料,包括设置内存大小、垃圾回收... 目录前言一、基础命令结构二、常见的 Java 命令参数1. 设置内存大小2. 配置垃圾回收器3. 配置线程栈大小

SpringBoot实现微信小程序支付功能

《SpringBoot实现微信小程序支付功能》小程序支付功能已成为众多应用的核心需求之一,本文主要介绍了SpringBoot实现微信小程序支付功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作... 目录一、引言二、准备工作(一)微信支付商户平台配置(二)Spring Boot项目搭建(三)配置文件

解决SpringBoot启动报错:Failed to load property source from location 'classpath:/application.yml'

《解决SpringBoot启动报错:Failedtoloadpropertysourcefromlocationclasspath:/application.yml问题》这篇文章主要介绍... 目录在启动SpringBoot项目时报如下错误原因可能是1.yml中语法错误2.yml文件格式是GBK总结在启动S

Spring中配置ContextLoaderListener方式

《Spring中配置ContextLoaderListener方式》:本文主要介绍Spring中配置ContextLoaderListener方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录Spring中配置ContextLoaderLishttp://www.chinasem.cntene

利用Python快速搭建Markdown笔记发布系统

《利用Python快速搭建Markdown笔记发布系统》这篇文章主要为大家详细介绍了使用Python生态的成熟工具,在30分钟内搭建一个支持Markdown渲染、分类标签、全文搜索的私有化知识发布系统... 目录引言:为什么要自建知识博客一、技术选型:极简主义开发栈二、系统架构设计三、核心代码实现(分步解析

java实现延迟/超时/定时问题

《java实现延迟/超时/定时问题》:本文主要介绍java实现延迟/超时/定时问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java实现延迟/超时/定时java 每间隔5秒执行一次,一共执行5次然后结束scheduleAtFixedRate 和 schedu

Java Optional避免空指针异常的实现

《JavaOptional避免空指针异常的实现》空指针异常一直是困扰开发者的常见问题之一,本文主要介绍了JavaOptional避免空指针异常的实现,帮助开发者编写更健壮、可读性更高的代码,减少因... 目录一、Optional 概述二、Optional 的创建三、Optional 的常用方法四、Optio

Spring Boot项目中结合MyBatis实现MySQL的自动主从切换功能

《SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能》:本文主要介绍SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能,本文分步骤给大家介绍的... 目录原理解析1. mysql主从复制(Master-Slave Replication)2. 读写分离3.

idea maven编译报错Java heap space的解决方法

《ideamaven编译报错Javaheapspace的解决方法》这篇文章主要为大家详细介绍了ideamaven编译报错Javaheapspace的相关解决方法,文中的示例代码讲解详细,感兴趣的... 目录1.增加 Maven 编译的堆内存2. 增加 IntelliJ IDEA 的堆内存3. 优化 Mave

Java String字符串的常用使用方法

《JavaString字符串的常用使用方法》String是JDK提供的一个类,是引用类型,并不是基本的数据类型,String用于字符串操作,在之前学习c语言的时候,对于一些字符串,会初始化字符数组表... 目录一、什么是String二、如何定义一个String1. 用双引号定义2. 通过构造函数定义三、St