你真的搞懂Java 线程池了???未必吧!

2024-02-12 23:40
文章标签 java 线程 真的 搞懂 未必

本文主要是介绍你真的搞懂Java 线程池了???未必吧!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件)则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

为什么要使用线程池

当系统创建一个新的线程,因为它涉及到与操作系统的交互,所以它的成本是比较高的。经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。当程序中需要使用存活周期很短的线程时,应该考虑使用线程池。

线程池的特点:

  1. 创建一个线程池,该线程池重用在一个共享的无界队列上运行的固定数量的线程。

  2. 在任何时候,在大多数情况下线程都是活动的处理任务。

  3. 如果在所有线程都处于活动状态时提交额外的任务,它们将在队列中等待,直到有一个线程可用为止。

  4. 如果任何线程在关闭之前的执行过程中由于失败而终止,那么如果需要执行后续任务,则需要一个新线程来替代它

  5. 池中的线程将一直存在,直到显式结束。

使用线程池执行线程任务的步骤

  1. 调用 Executors 类的静态方法创建 ExecutorService 对象

  2. 创建一个类,实现 Runnable 接口

  3. 调用 ExecutorService 对象的 execute() 方法来执行 Runnable 接口实例

  4. 最后当不再使用时,调用 ExecutorService 对象的 shutdown() 方法关闭线程池

Executors 是一个工厂类,主要用来创建 ExecutorService:

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

用 newFixedThreadPool() 这个静态方法用来创建一个可重用固定线程数的线程池。

四、示例代码

// 创建一个实现Runnable接口的类 Thread1
class Thread1 implements Runnable {@Overridepublic void run() {for (int i = 0; i <= 50; i++) {if (i % 2 == 0) {System.out.println(Thread.currentThread().getName() + "的i值为: " + i);}}}
}// 创建一个实现Runnable接口的类 Thread2
class Thread2 implements Runnable {@Overridepublic void run() {for (int i = 0; i <= 50; i++) {if (i % 2 != 0) {System.out.println(Thread.currentThread().getName() + "的i值为: " + i);}}}
}public class ThreadPool {public static void main(String[] args) {// 创建一个固定线程数的的线程池ExecutorService pool = Executors.newFixedThreadPool(10);// 向线程中提交两个任务,执行指定的线程的操作。需要提供实现Runnable接口实现类的对象pool.execute(new Thread1());pool.execute(new Thread2());// 关闭线程池pool.shutdown();}
}

运行结果:

五、剖析 ThreadPoolExecutor 底层源码

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

1、创建一个线程池时的参数

  • corePoolSize 表示核心池的大小

当提交一个任务到线程池的时候,线程池将会创建一个线程来执行任务。这个值的大小设置非常关键,设置过小将频繁地创建或销毁线程,设置过大则会造成资源浪费。线程池新建线程的时候,如果当前线程总数小于 corePoolSize,则新建的是核心线程;如果超过 corePoolSize,则新建的是非核心线程。

  • maximumPoolSize 表示线程池的最大数量

如果队列(workQueue))满了,并且已创建的线程数小于 maximumPoolSize,则线程池会进行创建新的线程执行任务。如果等待执行的线程数 大于 maximumPoolSize,缓存在队列中;如果 maximumPoolSize 等于 corePoolSize,即是固定大小线程池。

  • keepAliveTime 表示线程活动的保持时间

当需要执行的任务很多,线程池的线程数大于核心池的大小时,keepAliveTime 才起作用;当一个非核心线程,如果不干活 (闲置状态) 的时长超过这个参数所设定的时长,就会被销毁掉。

  • TimeUnit 表示线程活动保持时间的单位

它的单位有:TimeUnit.DAYS,TimeUnit.HOURS,TimeUnit.MINUTES,TimeUnit.MILLISECONDS,TimeUnit.MICRODECONDS

  • workQueue 表示线程池中存放被提交但尚未被执行的任务的队列

维护着等待执行的 Runnable 对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

  • threadFactory 用于设置创建线程的工厂

它用来给每个创建出来的线程设置一个名字,就可以知道线程任务是由哪个线程工厂产生的。

  • handler 表示拒绝处理策略

线程数量大于最大线程数,当超过 workQueue 的任务缓存区上限的时候,就可以调用该策略,这是一种简单的限流保护。

从 JDK 源码里面可以看到:

其中里面其设置的 corePoolSize(核心池的大小) 和 maximumPoolSize(最大线程数) 都是 nThreads,其设定的阻塞队列是无界的,则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。因此阿里巴约编码规范不推荐用 Executors 来创建 ThreadPoolExecutor。

使用 Executors 创建线程池时要明确创建的阻塞队列是否有界。因此最好自己创建 ThreadPoolExecutor。

ThreadPoolExecutor pool1 = (ThreadPoolExecutor) pool;// 设置核心池的大小pool1.setCorePoolSize(15);// setkeepAliveTime()方法 设置线程没有任务时最多保持多长时间后会停止pool1.setKeepAliveTime(60000, TimeUnit.HOURS);

Executors 的工厂方法主要就返回了 ThreadPoolExecutor 对象。

ThreadPoolExecutor 继承了 AbstractExecutorService:

public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 表示线程池数量的位数,很明显是29,Integer.SIZE=32private static final int COUNT_BITS = Integer.SIZE - 3;// 表示线程池最大数量,2^29 - 1private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }/** Bit field accessors that don't require unpacking ctl.* These depend on the bit layout and on workerCount being never negative.*/private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}/*** Attempts to CAS-increment the workerCount field of ctl.*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/*** Attempts to CAS-decrement the workerCount field of ctl.*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}/*** Decrements the workerCount field of ctl. This is called only on* abrupt termination of a thread (see processWorkerExit). Other* decrements are performed within getTask.*/private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}/*** The queue used for holding tasks and handing off to worker* threads.  We do not require that workQueue.poll() returning* null necessarily means that workQueue.isEmpty(), so rely* solely on isEmpty to see if the queue is empty (which we must* do for example when deciding whether to transition from* SHUTDOWN to TIDYING).  This accommodates special-purpose* queues such as DelayQueues for which poll() is allowed to* return null even if it may later return non-null when delays* expire.*/// 用于存放线程任务的阻塞队列private final BlockingQueue<Runnable> workQueue;/*** Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.*/// 重入锁private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/// 线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/// 等待条件支持终止private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;/** All user control parameters are declared as volatiles so that* ongoing actions are based on freshest values, but without need* for locking, since no internal invariants depend on them* changing synchronously with respect to other actions.*//*** Factory for new threads. All threads are created using this* factory (via method addWorker).  All callers must be prepared* for addWorker to fail, which may reflect a system or user's* policy limiting the number of threads.  Even though it is not* treated as an error, failure to create threads may result in* new tasks being rejected or existing ones remaining stuck in* the queue.** We go further and preserve pool invariants even in the face of* errors such as OutOfMemoryError, that might be thrown while* trying to create threads.  Such errors are rather common due to* the need to allocate a native stack in Thread.start, and users* will want to perform clean pool shutdown to clean up.  There* will likely be enough memory available for the cleanup code to* complete without encountering yet another OutOfMemoryError.*/// 创建新线程的线程工厂private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/// 饱和策略private volatile RejectedExecutionHandler handler;

总结:workerCount:表示有效的线程数目 runState:表示线程池里线程的运行状态

2、重要方法

① execute() 方法

// 调用execute方法将线程提交到线程池中 public void execute(Runnable command) {// 如果执行的任务为空,则会抛出空指针异常if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1\. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2\. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3\. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/// 获取线程池的控制状态int c = ctl.get();// 如果workerCount值小于corePoolSizeif (workerCountOf(c) < corePoolSize) {// 添加任务到worker集合当中,成功的话返回if (addWorker(command, true))return;// 如果失败,再次获取线程池的控制状态c = ctl.get();}// 如果corePoolSize已经满了,则需要加入到阻塞队列// 判断线程池的状态以及是否可以往阻塞队列中继续添加runnableif (isRunning(c) && workQueue.offer(command)) {// 获取线程池的状态int recheck = ctl.get();// 再次检查状态,线程池不处于RUNNING状态,将任务从workQueue队列中移除if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果此时队列已满,则会采取相应的拒绝策略else if (!addWorker(command, false))reject(command);} 

总结:往 corePoolSize 中加入任务进行执行 当 corePoolSize 满时往阻塞队列中加入任务 阻塞队列满时并且 maximumPoolSize 已满,则采取相应的拒绝策略

② addWorker() 方法

private boolean addWorker(Runnable firstTask, boolean core) {// 外部循环标志retry:for (;;) {// 获取线程池的状态int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 获取workerCountint wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 跳出循环    if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 实例化worker对象w = new Worker(firstTask);// 获取worker的线程final Thread t = w.thread;// 如果获取的线程不为空if (t != null) {// 初始线程池的锁final ReentrantLock mainLock = this.mainLock;// 得到锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 获取锁后再次检查,获取线程池runStateint rs = runStateOf(ctl.get());// 判断if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查线程是否启动if (t.isAlive()) // precheck that t is startable// 如果未启动存活,则抛出异常throw new IllegalThreadStateException();// 往corePoolSize中加入任务    workers.add(w);// 获取workers集合的大小int s = workers.size();// 如果大小超过largestPoolSizeif (s > largestPoolSize)// 重新设置线程池拥有最大线程数的大小largestPoolSize = s;// 改变状态    workerAdded = true;}} finally {// 释放锁mainLock.unlock();}if (workerAdded) {// 运行t.start();// 改变状态workerStarted = true;}}} finally {// 如果worker没有启动成功if (! workerStarted)addWorkerFailed(w);}// 返回worker是否成功启动的标记return workerStarted;
} 

使用线程池的好处

  • 降低资源消耗 。重复利用线程池中已经创建好的线程,不需要每次都创建,降低创建和销毁造成的消耗。

  • 提高响应的速度。当任务分配下来时,任务无需等到创建线程就能被执行,减少了创建线程的时间。

  • 方便进行线程管理。线程无限制的被创建,会占用系统资源并且还会降低系统的稳定性。使用线程池可以进行统一管理,设置核心池的大小,设置线程没有任务时最多保持多长时间。


作者:superstar001

链接:

https://zhuanlan.zhihu.com/p/259324524

这篇关于你真的搞懂Java 线程池了???未必吧!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/703928

相关文章

Spring boot整合dubbo+zookeeper的详细过程

《Springboot整合dubbo+zookeeper的详细过程》本文讲解SpringBoot整合Dubbo与Zookeeper实现API、Provider、Consumer模式,包含依赖配置、... 目录Spring boot整合dubbo+zookeeper1.创建父工程2.父工程引入依赖3.创建ap

Linux线程之线程的创建、属性、回收、退出、取消方式

《Linux线程之线程的创建、属性、回收、退出、取消方式》文章总结了线程管理核心知识:线程号唯一、创建方式、属性设置(如分离状态与栈大小)、回收机制(join/detach)、退出方法(返回/pthr... 目录1. 线程号2. 线程的创建3. 线程属性4. 线程的回收5. 线程的退出6. 线程的取消7.

Linux下进程的CPU配置与线程绑定过程

《Linux下进程的CPU配置与线程绑定过程》本文介绍Linux系统中基于进程和线程的CPU配置方法,通过taskset命令和pthread库调整亲和力,将进程/线程绑定到特定CPU核心以优化资源分配... 目录1 基于进程的CPU配置1.1 对CPU亲和力的配置1.2 绑定进程到指定CPU核上运行2 基于

SpringBoot结合Docker进行容器化处理指南

《SpringBoot结合Docker进行容器化处理指南》在当今快速发展的软件工程领域,SpringBoot和Docker已经成为现代Java开发者的必备工具,本文将深入讲解如何将一个SpringBo... 目录前言一、为什么选择 Spring Bootjavascript + docker1. 快速部署与

Spring Boot spring-boot-maven-plugin 参数配置详解(最新推荐)

《SpringBootspring-boot-maven-plugin参数配置详解(最新推荐)》文章介绍了SpringBootMaven插件的5个核心目标(repackage、run、start... 目录一 spring-boot-maven-plugin 插件的5个Goals二 应用场景1 重新打包应用

SpringBoot+EasyExcel实现自定义复杂样式导入导出

《SpringBoot+EasyExcel实现自定义复杂样式导入导出》这篇文章主要为大家详细介绍了SpringBoot如何结果EasyExcel实现自定义复杂样式导入导出功能,文中的示例代码讲解详细,... 目录安装处理自定义导出复杂场景1、列不固定,动态列2、动态下拉3、自定义锁定行/列,添加密码4、合并

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

Java中读取YAML文件配置信息常见问题及解决方法

《Java中读取YAML文件配置信息常见问题及解决方法》:本文主要介绍Java中读取YAML文件配置信息常见问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录1 使用Spring Boot的@ConfigurationProperties2. 使用@Valu

创建Java keystore文件的完整指南及详细步骤

《创建Javakeystore文件的完整指南及详细步骤》本文详解Java中keystore的创建与配置,涵盖私钥管理、自签名与CA证书生成、SSL/TLS应用,强调安全存储及验证机制,确保通信加密和... 目录1. 秘密键(私钥)的理解与管理私钥的定义与重要性私钥的管理策略私钥的生成与存储2. 证书的创建与

浅析Spring如何控制Bean的加载顺序

《浅析Spring如何控制Bean的加载顺序》在大多数情况下,我们不需要手动控制Bean的加载顺序,因为Spring的IoC容器足够智能,但在某些特殊场景下,这种隐式的依赖关系可能不存在,下面我们就来... 目录核心原则:依赖驱动加载手动控制 Bean 加载顺序的方法方法 1:使用@DependsOn(最直