你真的搞懂Java 线程池了???未必吧!

2024-02-12 23:40
文章标签 java 线程 真的 搞懂 未必

本文主要是介绍你真的搞懂Java 线程池了???未必吧!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件)则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

为什么要使用线程池

当系统创建一个新的线程,因为它涉及到与操作系统的交互,所以它的成本是比较高的。经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。当程序中需要使用存活周期很短的线程时,应该考虑使用线程池。

线程池的特点:

  1. 创建一个线程池,该线程池重用在一个共享的无界队列上运行的固定数量的线程。

  2. 在任何时候,在大多数情况下线程都是活动的处理任务。

  3. 如果在所有线程都处于活动状态时提交额外的任务,它们将在队列中等待,直到有一个线程可用为止。

  4. 如果任何线程在关闭之前的执行过程中由于失败而终止,那么如果需要执行后续任务,则需要一个新线程来替代它

  5. 池中的线程将一直存在,直到显式结束。

使用线程池执行线程任务的步骤

  1. 调用 Executors 类的静态方法创建 ExecutorService 对象

  2. 创建一个类,实现 Runnable 接口

  3. 调用 ExecutorService 对象的 execute() 方法来执行 Runnable 接口实例

  4. 最后当不再使用时,调用 ExecutorService 对象的 shutdown() 方法关闭线程池

Executors 是一个工厂类,主要用来创建 ExecutorService:

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

用 newFixedThreadPool() 这个静态方法用来创建一个可重用固定线程数的线程池。

四、示例代码

// 创建一个实现Runnable接口的类 Thread1
class Thread1 implements Runnable {@Overridepublic void run() {for (int i = 0; i <= 50; i++) {if (i % 2 == 0) {System.out.println(Thread.currentThread().getName() + "的i值为: " + i);}}}
}// 创建一个实现Runnable接口的类 Thread2
class Thread2 implements Runnable {@Overridepublic void run() {for (int i = 0; i <= 50; i++) {if (i % 2 != 0) {System.out.println(Thread.currentThread().getName() + "的i值为: " + i);}}}
}public class ThreadPool {public static void main(String[] args) {// 创建一个固定线程数的的线程池ExecutorService pool = Executors.newFixedThreadPool(10);// 向线程中提交两个任务,执行指定的线程的操作。需要提供实现Runnable接口实现类的对象pool.execute(new Thread1());pool.execute(new Thread2());// 关闭线程池pool.shutdown();}
}

运行结果:

五、剖析 ThreadPoolExecutor 底层源码

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

1、创建一个线程池时的参数

  • corePoolSize 表示核心池的大小

当提交一个任务到线程池的时候,线程池将会创建一个线程来执行任务。这个值的大小设置非常关键,设置过小将频繁地创建或销毁线程,设置过大则会造成资源浪费。线程池新建线程的时候,如果当前线程总数小于 corePoolSize,则新建的是核心线程;如果超过 corePoolSize,则新建的是非核心线程。

  • maximumPoolSize 表示线程池的最大数量

如果队列(workQueue))满了,并且已创建的线程数小于 maximumPoolSize,则线程池会进行创建新的线程执行任务。如果等待执行的线程数 大于 maximumPoolSize,缓存在队列中;如果 maximumPoolSize 等于 corePoolSize,即是固定大小线程池。

  • keepAliveTime 表示线程活动的保持时间

当需要执行的任务很多,线程池的线程数大于核心池的大小时,keepAliveTime 才起作用;当一个非核心线程,如果不干活 (闲置状态) 的时长超过这个参数所设定的时长,就会被销毁掉。

  • TimeUnit 表示线程活动保持时间的单位

它的单位有:TimeUnit.DAYS,TimeUnit.HOURS,TimeUnit.MINUTES,TimeUnit.MILLISECONDS,TimeUnit.MICRODECONDS

  • workQueue 表示线程池中存放被提交但尚未被执行的任务的队列

维护着等待执行的 Runnable 对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

  • threadFactory 用于设置创建线程的工厂

它用来给每个创建出来的线程设置一个名字,就可以知道线程任务是由哪个线程工厂产生的。

  • handler 表示拒绝处理策略

线程数量大于最大线程数,当超过 workQueue 的任务缓存区上限的时候,就可以调用该策略,这是一种简单的限流保护。

从 JDK 源码里面可以看到:

其中里面其设置的 corePoolSize(核心池的大小) 和 maximumPoolSize(最大线程数) 都是 nThreads,其设定的阻塞队列是无界的,则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。因此阿里巴约编码规范不推荐用 Executors 来创建 ThreadPoolExecutor。

使用 Executors 创建线程池时要明确创建的阻塞队列是否有界。因此最好自己创建 ThreadPoolExecutor。

ThreadPoolExecutor pool1 = (ThreadPoolExecutor) pool;// 设置核心池的大小pool1.setCorePoolSize(15);// setkeepAliveTime()方法 设置线程没有任务时最多保持多长时间后会停止pool1.setKeepAliveTime(60000, TimeUnit.HOURS);

Executors 的工厂方法主要就返回了 ThreadPoolExecutor 对象。

ThreadPoolExecutor 继承了 AbstractExecutorService:

public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 表示线程池数量的位数,很明显是29,Integer.SIZE=32private static final int COUNT_BITS = Integer.SIZE - 3;// 表示线程池最大数量,2^29 - 1private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }/** Bit field accessors that don't require unpacking ctl.* These depend on the bit layout and on workerCount being never negative.*/private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}/*** Attempts to CAS-increment the workerCount field of ctl.*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/*** Attempts to CAS-decrement the workerCount field of ctl.*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}/*** Decrements the workerCount field of ctl. This is called only on* abrupt termination of a thread (see processWorkerExit). Other* decrements are performed within getTask.*/private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}/*** The queue used for holding tasks and handing off to worker* threads.  We do not require that workQueue.poll() returning* null necessarily means that workQueue.isEmpty(), so rely* solely on isEmpty to see if the queue is empty (which we must* do for example when deciding whether to transition from* SHUTDOWN to TIDYING).  This accommodates special-purpose* queues such as DelayQueues for which poll() is allowed to* return null even if it may later return non-null when delays* expire.*/// 用于存放线程任务的阻塞队列private final BlockingQueue<Runnable> workQueue;/*** Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.*/// 重入锁private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/// 线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/// 等待条件支持终止private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;/** All user control parameters are declared as volatiles so that* ongoing actions are based on freshest values, but without need* for locking, since no internal invariants depend on them* changing synchronously with respect to other actions.*//*** Factory for new threads. All threads are created using this* factory (via method addWorker).  All callers must be prepared* for addWorker to fail, which may reflect a system or user's* policy limiting the number of threads.  Even though it is not* treated as an error, failure to create threads may result in* new tasks being rejected or existing ones remaining stuck in* the queue.** We go further and preserve pool invariants even in the face of* errors such as OutOfMemoryError, that might be thrown while* trying to create threads.  Such errors are rather common due to* the need to allocate a native stack in Thread.start, and users* will want to perform clean pool shutdown to clean up.  There* will likely be enough memory available for the cleanup code to* complete without encountering yet another OutOfMemoryError.*/// 创建新线程的线程工厂private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/// 饱和策略private volatile RejectedExecutionHandler handler;

总结:workerCount:表示有效的线程数目 runState:表示线程池里线程的运行状态

2、重要方法

① execute() 方法

// 调用execute方法将线程提交到线程池中 public void execute(Runnable command) {// 如果执行的任务为空,则会抛出空指针异常if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1\. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2\. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3\. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/// 获取线程池的控制状态int c = ctl.get();// 如果workerCount值小于corePoolSizeif (workerCountOf(c) < corePoolSize) {// 添加任务到worker集合当中,成功的话返回if (addWorker(command, true))return;// 如果失败,再次获取线程池的控制状态c = ctl.get();}// 如果corePoolSize已经满了,则需要加入到阻塞队列// 判断线程池的状态以及是否可以往阻塞队列中继续添加runnableif (isRunning(c) && workQueue.offer(command)) {// 获取线程池的状态int recheck = ctl.get();// 再次检查状态,线程池不处于RUNNING状态,将任务从workQueue队列中移除if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果此时队列已满,则会采取相应的拒绝策略else if (!addWorker(command, false))reject(command);} 

总结:往 corePoolSize 中加入任务进行执行 当 corePoolSize 满时往阻塞队列中加入任务 阻塞队列满时并且 maximumPoolSize 已满,则采取相应的拒绝策略

② addWorker() 方法

private boolean addWorker(Runnable firstTask, boolean core) {// 外部循环标志retry:for (;;) {// 获取线程池的状态int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 获取workerCountint wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 跳出循环    if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 实例化worker对象w = new Worker(firstTask);// 获取worker的线程final Thread t = w.thread;// 如果获取的线程不为空if (t != null) {// 初始线程池的锁final ReentrantLock mainLock = this.mainLock;// 得到锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 获取锁后再次检查,获取线程池runStateint rs = runStateOf(ctl.get());// 判断if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查线程是否启动if (t.isAlive()) // precheck that t is startable// 如果未启动存活,则抛出异常throw new IllegalThreadStateException();// 往corePoolSize中加入任务    workers.add(w);// 获取workers集合的大小int s = workers.size();// 如果大小超过largestPoolSizeif (s > largestPoolSize)// 重新设置线程池拥有最大线程数的大小largestPoolSize = s;// 改变状态    workerAdded = true;}} finally {// 释放锁mainLock.unlock();}if (workerAdded) {// 运行t.start();// 改变状态workerStarted = true;}}} finally {// 如果worker没有启动成功if (! workerStarted)addWorkerFailed(w);}// 返回worker是否成功启动的标记return workerStarted;
} 

使用线程池的好处

  • 降低资源消耗 。重复利用线程池中已经创建好的线程,不需要每次都创建,降低创建和销毁造成的消耗。

  • 提高响应的速度。当任务分配下来时,任务无需等到创建线程就能被执行,减少了创建线程的时间。

  • 方便进行线程管理。线程无限制的被创建,会占用系统资源并且还会降低系统的稳定性。使用线程池可以进行统一管理,设置核心池的大小,设置线程没有任务时最多保持多长时间。


作者:superstar001

链接:

https://zhuanlan.zhihu.com/p/259324524

这篇关于你真的搞懂Java 线程池了???未必吧!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/703928

相关文章

Java编译生成多个.class文件的原理和作用

《Java编译生成多个.class文件的原理和作用》作为一名经验丰富的开发者,在Java项目中执行编译后,可能会发现一个.java源文件有时会产生多个.class文件,从技术实现层面详细剖析这一现象... 目录一、内部类机制与.class文件生成成员内部类(常规内部类)局部内部类(方法内部类)匿名内部类二、

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

Springboot @Autowired和@Resource的区别解析

《Springboot@Autowired和@Resource的区别解析》@Resource是JDK提供的注解,只是Spring在实现上提供了这个注解的功能支持,本文给大家介绍Springboot@... 目录【一】定义【1】@Autowired【2】@Resource【二】区别【1】包含的属性不同【2】@

springboot循环依赖问题案例代码及解决办法

《springboot循环依赖问题案例代码及解决办法》在SpringBoot中,如果两个或多个Bean之间存在循环依赖(即BeanA依赖BeanB,而BeanB又依赖BeanA),会导致Spring的... 目录1. 什么是循环依赖?2. 循环依赖的场景案例3. 解决循环依赖的常见方法方法 1:使用 @La

Java枚举类实现Key-Value映射的多种实现方式

《Java枚举类实现Key-Value映射的多种实现方式》在Java开发中,枚举(Enum)是一种特殊的类,本文将详细介绍Java枚举类实现key-value映射的多种方式,有需要的小伙伴可以根据需要... 目录前言一、基础实现方式1.1 为枚举添加属性和构造方法二、http://www.cppcns.co

Elasticsearch 在 Java 中的使用教程

《Elasticsearch在Java中的使用教程》Elasticsearch是一个分布式搜索和分析引擎,基于ApacheLucene构建,能够实现实时数据的存储、搜索、和分析,它广泛应用于全文... 目录1. Elasticsearch 简介2. 环境准备2.1 安装 Elasticsearch2.2 J

Java中的String.valueOf()和toString()方法区别小结

《Java中的String.valueOf()和toString()方法区别小结》字符串操作是开发者日常编程任务中不可或缺的一部分,转换为字符串是一种常见需求,其中最常见的就是String.value... 目录String.valueOf()方法方法定义方法实现使用示例使用场景toString()方法方法

Java中List的contains()方法的使用小结

《Java中List的contains()方法的使用小结》List的contains()方法用于检查列表中是否包含指定的元素,借助equals()方法进行判断,下面就来介绍Java中List的c... 目录详细展开1. 方法签名2. 工作原理3. 使用示例4. 注意事项总结结论:List 的 contain

Java实现文件图片的预览和下载功能

《Java实现文件图片的预览和下载功能》这篇文章主要为大家详细介绍了如何使用Java实现文件图片的预览和下载功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... Java实现文件(图片)的预览和下载 @ApiOperation("访问文件") @GetMapping("

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis