Java 线程知识笔记 (三) Executor与ThreadPool 其一

2024-01-10 09:58

本文主要是介绍Java 线程知识笔记 (三) Executor与ThreadPool 其一,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

到目前为止,基本上已经把Java中线程有关的基础知识说了个大概。考虑到开启线程是一个开销很大的事情,如果每个子线程都在使用的时候做初始化等待分配等等一系列的事情,就会非常的影响程序的响应速度。为了解决这个问题,Java也是从1.5开始提出了线程池(ThreadPool)的概念。本篇就会着重对线程池进行一个讲解。更多线程知识内容请点击【Java 多线程和锁知识笔记系列】

效率的比较

前面已经说了,启动线程、分配CPU时间片、销毁是一个很耗资源的工作。为了给大家一个直观的概念,我们用两个几乎一样的方法做个例子:方法一使用构建一万个子线程的方式去往list里写入10000条数据;方法二则是使用构建一个线程池,使用线程池往里面list里写入10000条数据。我们可以对比它们的执行时间。

public class ThreadPool1 {//普通线程public static void threads() throws InterruptedException{Long start=System.currentTimeMillis();List<Integer> list=new ArrayList<>();Random random=new Random();for (int i = 0; i <10000 ; i++) {Thread t=new Thread(()->list.add(random.nextInt()));t.start();t.join();}System.out.println("耗时"+ (System.currentTimeMillis()-start));System.out.println(list.size());}//线程池public static void executors() throws InterruptedException {Long start=System.currentTimeMillis();List<Integer> list=new ArrayList<>();Random random=new Random();ExecutorService service= Executors.newSingleThreadExecutor();for (int i = 0; i <10000 ; i++) {service.execute(() -> list.add(random.nextInt()));}service.shutdown();service.awaitTermination(1000, TimeUnit.HOURS);System.out.println("耗时"+ (System.currentTimeMillis()-start));System.out.println(list.size());}public static void main(String[] args) throws InterruptedException {System.out.println("普通线程执行效率:");threads();System.out.println("线程池执行效率:");executors();}
}
运行输出:
普通线程执行效率:
耗时4977
10000
线程池执行效率:
耗时54
10000

可以看到两个方法做同样的事情所消耗的时间几乎差了100倍,这个效率的对比还是相当恐怖的,但这个例子也说明了线程越多效率未必越高。之前我们说过线程的执行必须被CPU分配时间分片才能进行。由于我们使用多线程,当频繁的创建销毁线程的时候,会有上下文的切换,而这种切换会浪费很多资源。我们在第一篇的时候分析了当使用join()的时候会阻塞直到当前线程跑完。那么就意味着我们threads()方法中启动了10000个线程,并且这些线程必须一个个等待前面的线程释放资源。而executors()中,其实只启动了一个线程,循环了10000次,效率高下立判。

例子中第二个方法没有用线程池用的是newSingleThreadExecutor()。这是因为如果这个例子直接使用newCachedThreadPool()去做,就难以保证数据的一致性。但是可以理解为newSingleThreadExecutor()使用的是单线程的线程池,因为其底层也是用的线程池ThreadPoolExecutor。不管如何这两个方法返回的都是ExecutorService对象,而使用的都是ThreadPoolExecutor类去创建,那么后面的重点就在这两个东西上了。

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

ThreadPoolExecutor类

研究一个类能做什么首先要看到它的继承关系,然后是它的构造方法。首先我们先看下继承关系。

在这里插入图片描述

继承关系很简单,一条直线下来。顶层是Executor接口,这个接口也是整个线程池体系里的最顶层接口,其中只有一个方法execute(),在线程池相关的各种类里面普遍使用。ExecutorService也是一个接口,这个接口扩展了Executor,其中使用submit()方法把execute()做了一个包装,使得execute()执行以后能够具有返回值,它的实现方法在AbstractExecutorService中,AbstractExecutorService抽象类对ExecutorService接口做了一个简单的实现。之所以这样说是因为线程池体系很大,由于ThreadPoolExecutor继承了AbstractExecutorService,因此它也可以调用submit()方法,submit()有3个不同的重载,下面贴出来其中一个。只是想要说明其实submit()execute()几乎是等同的。

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}

ThreadPoolExecutor初始化

简单分析完继承关系,接着看构造方法,这个类一共有四个构造方法,但是最终都会调用到下面这个里面。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize; //线程池初始化创建的线程数this.maximumPoolSize = maximumPoolSize; //线程池允许同时运行的最大线程数,一旦核心线程满了,且短时间内无法释放,就会在这个范围内分配新线程this.workQueue = workQueue; //线程池的阻塞队列,用于存放超出核心线程数的线程this.keepAliveTime = unit.toNanos(keepAliveTime); //空闲线程的存活时间,给非核心池线程用的this.threadFactory = threadFactory; //工厂this.handler = handler; //错误handler
}

构造方法里面一共有7个参数,他们的功能如下:

  • corePoolSize :核心线程数,池中所保存的线程数,包括空闲线程。
  • maximumPoolSize:池中允许的最大线程数。
  • keepAliveTime: 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
  • unit:keepAliveTime 参数的时间单位。
  • workQueue :执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
  • threadFactory:执行程序创建新线程时使用的工厂。
  • handler :当队列和最大线程池都满了之后的饱和策略。

ThreadPoolExecutor#execute方法

初始化做完,就要开始执行execute()方法了。

public void execute(Runnable command) {if (command == null)  //任务是否为nullthrow new NullPointerException(); //为null抛异常/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();  //获取当前线程的状态码if (workerCountOf(c) < corePoolSize) { //如果小于核心数,说明池中还有核心线程if (addWorker(command, true)) //直接调用addWorker()执行return;c = ctl.get(); //如果无法直接执行,更新状态码}if (isRunning(c) && workQueue.offer(command)) {//大于核心数,所有核心处于运行状态,放进队列int recheck = ctl.get(); //重新获取状态码if (! isRunning(recheck) && remove(command))// 再次验证,发现线程池故障了reject(command); //抛出异常,拒绝执行else if (workerCountOf(recheck) == 0)// 重新检查发现核心线程已经有空余了addWorker(null, false); //起一个新线程}else if (!addWorker(command, false)) //没法放进队列,那就直接执行,在非核心线程池中执行reject(command);//都不可以那就拒绝执行(抛异常)
}

进入以后首先判空,然后获取当前线程的状态。官方注释中execute()方法的执行一共分为三种情况。

情况策略
线程数<核心数如果当前运行的线程数小于池的核心数量,尝试给过来的线程创建一个线程作为它的第一个任务。调用addWorker()方法原子性的检查runState和workerCount,并且因此防止不应该添加现成的false告警返回false。
线程数>核心数如果大于或者等于线程池的核心数量,而且可以被放到线程队列里。会首先经过一个double-check,这一步是为了检查是否应该把这个线程添加到队列里(因为上次检查后线程池死了)或者进入这个方法后线程池被关闭了。所以需要recheck状态,并且如果发现任务已经停了回滚队列,或者开启一个新的新城,如果没有一个活着。
线程数>核心数,且无法放到线程队列里如果没有办法把这个任务放到线程队列里,那就直接在非核心池中开启新的线程执行。如果都失败了,可以说线程池死了或者饱和了,那么就抛出异常拒绝执行这个任务,开始执行异常处理。

ThreadPoolExecutor#addWork方法

从上面的分析来看,最终addWorker ()就是执行线程池中线程的方法,那么直接进去里面。

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get(); //获取当前状态码int rs = runStateOf(c); //转换为runStateu运行状态// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&   //检测运行状态! (rs == SHUTDOWN &&  //检测运行状态为SHUTDOWNfirstTask == null &&   //检查传入的当前任务是null! workQueue.isEmpty()))  //检查队列不为null// 如果进入这里说明任务、线程池或者队列有问题,不能执行当前任务return false;for (;;) {int wc = workerCountOf(c); //转化为工作数量if (wc >= CAPACITY || //判断工作数量是不是满足条件wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c)) //修改状态码break retry; //跳出空转c = ctl.get();  // Re-read ctl //修改失败重新读取状态码if (runStateOf(c) != rs) //如果修改失败continue retry;//继续空转// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;/**下面是执行,后面说**/
}

首先进来发现这是一个for的空转自旋,这一部分是为了做验证用的。首先还是拿到状态rs,然后经过一个状态校验看看线程池是不是死了,如果条件都满足,就返回false说任务干不了。然后又一个自旋用于判断当前正在工作数量和线程池的参数是不是满足,如果满足CAS更新线程数,修改成功跳出执行后面的逻辑,修改失败继续空转再判断一次。

private boolean addWorker(Runnable firstTask, boolean core) {/**接上**/boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask); //转换为Worker对象wfinal Thread t = w.thread; //通过Worker对象创建线程if (t != null) { //判空,这个线程就是执行线程final ReentrantLock mainLock = this.mainLock;mainLock.lock(); //加锁,因为workers不是线程安全的所以要加锁try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w); //添加w到一个set,作用仅仅为了统计个数int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

通过验证的任务,就到了执行的地方了。首先把任务转为Worker对象,然后利用Worker对象构建线程对象t,经过再次验证线程池依然健在以后,利用t.start()去调取执行方法。

ThreadPoolExecutor.Worker内部类

这里的Worker类是ThreadPoolExecutor的内部类实现了Runnable接口,因此Worker里面也有一个run()方法,就是在此时调用的。此外这个内部类还继承了一个AbstractQueuedSynchronizer类,看名字大概知道是一个队列同步器。其实这是一个用锁实现AQS,同步阻塞器,以后有机会再说。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{/**略**//** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}/**略**/
}

进入以后又调用了runWorker(this)方法。

final void runWorker(Worker w) {Thread wt = Thread.currentThread(); //获取线程池线程Runnable task = w.firstTask;	//拿到当前任务w.firstTask = null;	//拿到后销毁w.unlock(); // allow interrupts	boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// 验证,如果线程池死了,打断运行if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//验证通过往下走beforeExecute(wt, task); //空方法,后续可能增强Throwable thrown = null;try {task.run(); //执行任务逻辑中的run()方法 } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); //空方法,后续可能增强}} finally {task = null;//销毁执行成功的任务w.completedTasks++;//完成数更新w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

进入后,首先获取到当前线程,后面用来判断当前的线程池不是还活着。然后拿到执行任务task = w.firstTask,验证线程池存活通过以后,直接执行task.run(),这里就是我们创建的线程中run()方法的逻辑调用执行的地方。还有一个值得注意的是while循环中的条件,当task!=null成立时,直接就执行。如果task!=null不成立,则taskgetTask()里面获取到值。从哪里取呢?从线程队列Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(),就是从execute()方法里说到的workQueue里面取。

上面说到,当看到有调用submit()方法时,可以看作调用的就是execute()方法。但是这俩方法还有些不同的:submit()方法是有返回值的是Future,而execute()方法没有返回值,类似于RunnableCallable的区别。当外部调用execute()方法的时候,这里调用task.run()中的taskThreadPool1对象。但是如果换到调用submit()方法的时候,这里的task就会是FutureTask对象去调用run()。其原因已经在上面submit()方法里显示的很清楚了,可以参考博客【Java 线程知识笔记 (二) Callable与Future】里面的FutureTask部分内容。

ThreadPoolExecutor#processWorkerExit收尾

所有上述的内容都执行无误,就进入了processWorkerExit()执行线程退出的过程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}

既然是在finally块中,无论如何都会执行到这里。这部分代码就是在判断线程在初始化的时候的存活时间,是否需要remove,是否因为某些原因需要重新执行addWorker()等等内容进行一个收尾的工作。到这一步一个线程在整个线程池中的动作就抛完了。

总结

本篇从一个例子开始梳理了一个线程在线程池里是如何运行的,以及线程池对于线程的执行策略,最后画张图进行一个总结。下一篇【Java 线程知识笔记 (四) Executor与ThreadPool 其二】会对本篇中没有涉及到的一些琐碎内容,比如线程池中状态的变化等等进行一个集中的讲解。
在这里插入图片描述

这篇关于Java 线程知识笔记 (三) Executor与ThreadPool 其一的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/590406

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

sqlite3 相关知识

WAL 模式 VS 回滚模式 特性WAL 模式回滚模式(Rollback Journal)定义使用写前日志来记录变更。使用回滚日志来记录事务的所有修改。特点更高的并发性和性能;支持多读者和单写者。支持安全的事务回滚,但并发性较低。性能写入性能更好,尤其是读多写少的场景。写操作会造成较大的性能开销,尤其是在事务开始时。写入流程数据首先写入 WAL 文件,然后才从 WAL 刷新到主数据库。数据在开始

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听