你真的搞懂Java 线程池了???未必吧!

2024-02-12 23:40
文章标签 java 线程 真的 搞懂 未必

本文主要是介绍你真的搞懂Java 线程池了???未必吧!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件)则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

为什么要使用线程池

当系统创建一个新的线程,因为它涉及到与操作系统的交互,所以它的成本是比较高的。经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。当程序中需要使用存活周期很短的线程时,应该考虑使用线程池。

线程池的特点:

  1. 创建一个线程池,该线程池重用在一个共享的无界队列上运行的固定数量的线程。

  2. 在任何时候,在大多数情况下线程都是活动的处理任务。

  3. 如果在所有线程都处于活动状态时提交额外的任务,它们将在队列中等待,直到有一个线程可用为止。

  4. 如果任何线程在关闭之前的执行过程中由于失败而终止,那么如果需要执行后续任务,则需要一个新线程来替代它

  5. 池中的线程将一直存在,直到显式结束。

使用线程池执行线程任务的步骤

  1. 调用 Executors 类的静态方法创建 ExecutorService 对象

  2. 创建一个类,实现 Runnable 接口

  3. 调用 ExecutorService 对象的 execute() 方法来执行 Runnable 接口实例

  4. 最后当不再使用时,调用 ExecutorService 对象的 shutdown() 方法关闭线程池

Executors 是一个工厂类,主要用来创建 ExecutorService:

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

用 newFixedThreadPool() 这个静态方法用来创建一个可重用固定线程数的线程池。

四、示例代码

// 创建一个实现Runnable接口的类 Thread1
class Thread1 implements Runnable {@Overridepublic void run() {for (int i = 0; i <= 50; i++) {if (i % 2 == 0) {System.out.println(Thread.currentThread().getName() + "的i值为: " + i);}}}
}// 创建一个实现Runnable接口的类 Thread2
class Thread2 implements Runnable {@Overridepublic void run() {for (int i = 0; i <= 50; i++) {if (i % 2 != 0) {System.out.println(Thread.currentThread().getName() + "的i值为: " + i);}}}
}public class ThreadPool {public static void main(String[] args) {// 创建一个固定线程数的的线程池ExecutorService pool = Executors.newFixedThreadPool(10);// 向线程中提交两个任务,执行指定的线程的操作。需要提供实现Runnable接口实现类的对象pool.execute(new Thread1());pool.execute(new Thread2());// 关闭线程池pool.shutdown();}
}

运行结果:

五、剖析 ThreadPoolExecutor 底层源码

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

1、创建一个线程池时的参数

  • corePoolSize 表示核心池的大小

当提交一个任务到线程池的时候,线程池将会创建一个线程来执行任务。这个值的大小设置非常关键,设置过小将频繁地创建或销毁线程,设置过大则会造成资源浪费。线程池新建线程的时候,如果当前线程总数小于 corePoolSize,则新建的是核心线程;如果超过 corePoolSize,则新建的是非核心线程。

  • maximumPoolSize 表示线程池的最大数量

如果队列(workQueue))满了,并且已创建的线程数小于 maximumPoolSize,则线程池会进行创建新的线程执行任务。如果等待执行的线程数 大于 maximumPoolSize,缓存在队列中;如果 maximumPoolSize 等于 corePoolSize,即是固定大小线程池。

  • keepAliveTime 表示线程活动的保持时间

当需要执行的任务很多,线程池的线程数大于核心池的大小时,keepAliveTime 才起作用;当一个非核心线程,如果不干活 (闲置状态) 的时长超过这个参数所设定的时长,就会被销毁掉。

  • TimeUnit 表示线程活动保持时间的单位

它的单位有:TimeUnit.DAYS,TimeUnit.HOURS,TimeUnit.MINUTES,TimeUnit.MILLISECONDS,TimeUnit.MICRODECONDS

  • workQueue 表示线程池中存放被提交但尚未被执行的任务的队列

维护着等待执行的 Runnable 对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

  • threadFactory 用于设置创建线程的工厂

它用来给每个创建出来的线程设置一个名字,就可以知道线程任务是由哪个线程工厂产生的。

  • handler 表示拒绝处理策略

线程数量大于最大线程数,当超过 workQueue 的任务缓存区上限的时候,就可以调用该策略,这是一种简单的限流保护。

从 JDK 源码里面可以看到:

其中里面其设置的 corePoolSize(核心池的大小) 和 maximumPoolSize(最大线程数) 都是 nThreads,其设定的阻塞队列是无界的,则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。因此阿里巴约编码规范不推荐用 Executors 来创建 ThreadPoolExecutor。

使用 Executors 创建线程池时要明确创建的阻塞队列是否有界。因此最好自己创建 ThreadPoolExecutor。

ThreadPoolExecutor pool1 = (ThreadPoolExecutor) pool;// 设置核心池的大小pool1.setCorePoolSize(15);// setkeepAliveTime()方法 设置线程没有任务时最多保持多长时间后会停止pool1.setKeepAliveTime(60000, TimeUnit.HOURS);

Executors 的工厂方法主要就返回了 ThreadPoolExecutor 对象。

ThreadPoolExecutor 继承了 AbstractExecutorService:

public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 表示线程池数量的位数,很明显是29,Integer.SIZE=32private static final int COUNT_BITS = Integer.SIZE - 3;// 表示线程池最大数量,2^29 - 1private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }/** Bit field accessors that don't require unpacking ctl.* These depend on the bit layout and on workerCount being never negative.*/private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}/*** Attempts to CAS-increment the workerCount field of ctl.*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/*** Attempts to CAS-decrement the workerCount field of ctl.*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}/*** Decrements the workerCount field of ctl. This is called only on* abrupt termination of a thread (see processWorkerExit). Other* decrements are performed within getTask.*/private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}/*** The queue used for holding tasks and handing off to worker* threads.  We do not require that workQueue.poll() returning* null necessarily means that workQueue.isEmpty(), so rely* solely on isEmpty to see if the queue is empty (which we must* do for example when deciding whether to transition from* SHUTDOWN to TIDYING).  This accommodates special-purpose* queues such as DelayQueues for which poll() is allowed to* return null even if it may later return non-null when delays* expire.*/// 用于存放线程任务的阻塞队列private final BlockingQueue<Runnable> workQueue;/*** Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.*/// 重入锁private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/// 线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/// 等待条件支持终止private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;/** All user control parameters are declared as volatiles so that* ongoing actions are based on freshest values, but without need* for locking, since no internal invariants depend on them* changing synchronously with respect to other actions.*//*** Factory for new threads. All threads are created using this* factory (via method addWorker).  All callers must be prepared* for addWorker to fail, which may reflect a system or user's* policy limiting the number of threads.  Even though it is not* treated as an error, failure to create threads may result in* new tasks being rejected or existing ones remaining stuck in* the queue.** We go further and preserve pool invariants even in the face of* errors such as OutOfMemoryError, that might be thrown while* trying to create threads.  Such errors are rather common due to* the need to allocate a native stack in Thread.start, and users* will want to perform clean pool shutdown to clean up.  There* will likely be enough memory available for the cleanup code to* complete without encountering yet another OutOfMemoryError.*/// 创建新线程的线程工厂private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/// 饱和策略private volatile RejectedExecutionHandler handler;

总结:workerCount:表示有效的线程数目 runState:表示线程池里线程的运行状态

2、重要方法

① execute() 方法

// 调用execute方法将线程提交到线程池中 public void execute(Runnable command) {// 如果执行的任务为空,则会抛出空指针异常if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1\. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2\. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3\. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/// 获取线程池的控制状态int c = ctl.get();// 如果workerCount值小于corePoolSizeif (workerCountOf(c) < corePoolSize) {// 添加任务到worker集合当中,成功的话返回if (addWorker(command, true))return;// 如果失败,再次获取线程池的控制状态c = ctl.get();}// 如果corePoolSize已经满了,则需要加入到阻塞队列// 判断线程池的状态以及是否可以往阻塞队列中继续添加runnableif (isRunning(c) && workQueue.offer(command)) {// 获取线程池的状态int recheck = ctl.get();// 再次检查状态,线程池不处于RUNNING状态,将任务从workQueue队列中移除if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果此时队列已满,则会采取相应的拒绝策略else if (!addWorker(command, false))reject(command);} 

总结:往 corePoolSize 中加入任务进行执行 当 corePoolSize 满时往阻塞队列中加入任务 阻塞队列满时并且 maximumPoolSize 已满,则采取相应的拒绝策略

② addWorker() 方法

private boolean addWorker(Runnable firstTask, boolean core) {// 外部循环标志retry:for (;;) {// 获取线程池的状态int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 获取workerCountint wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 跳出循环    if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 实例化worker对象w = new Worker(firstTask);// 获取worker的线程final Thread t = w.thread;// 如果获取的线程不为空if (t != null) {// 初始线程池的锁final ReentrantLock mainLock = this.mainLock;// 得到锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 获取锁后再次检查,获取线程池runStateint rs = runStateOf(ctl.get());// 判断if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查线程是否启动if (t.isAlive()) // precheck that t is startable// 如果未启动存活,则抛出异常throw new IllegalThreadStateException();// 往corePoolSize中加入任务    workers.add(w);// 获取workers集合的大小int s = workers.size();// 如果大小超过largestPoolSizeif (s > largestPoolSize)// 重新设置线程池拥有最大线程数的大小largestPoolSize = s;// 改变状态    workerAdded = true;}} finally {// 释放锁mainLock.unlock();}if (workerAdded) {// 运行t.start();// 改变状态workerStarted = true;}}} finally {// 如果worker没有启动成功if (! workerStarted)addWorkerFailed(w);}// 返回worker是否成功启动的标记return workerStarted;
} 

使用线程池的好处

  • 降低资源消耗 。重复利用线程池中已经创建好的线程,不需要每次都创建,降低创建和销毁造成的消耗。

  • 提高响应的速度。当任务分配下来时,任务无需等到创建线程就能被执行,减少了创建线程的时间。

  • 方便进行线程管理。线程无限制的被创建,会占用系统资源并且还会降低系统的稳定性。使用线程池可以进行统一管理,设置核心池的大小,设置线程没有任务时最多保持多长时间。


作者:superstar001

链接:

https://zhuanlan.zhihu.com/p/259324524

这篇关于你真的搞懂Java 线程池了???未必吧!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/703928

相关文章

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.