你真的搞懂Java 线程池了???未必吧!

2024-02-12 23:40
文章标签 java 线程 真的 搞懂 未必

本文主要是介绍你真的搞懂Java 线程池了???未必吧!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

什么是线程池

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件)则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

为什么要使用线程池

当系统创建一个新的线程,因为它涉及到与操作系统的交互,所以它的成本是比较高的。经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。当程序中需要使用存活周期很短的线程时,应该考虑使用线程池。

线程池的特点:

  1. 创建一个线程池,该线程池重用在一个共享的无界队列上运行的固定数量的线程。

  2. 在任何时候,在大多数情况下线程都是活动的处理任务。

  3. 如果在所有线程都处于活动状态时提交额外的任务,它们将在队列中等待,直到有一个线程可用为止。

  4. 如果任何线程在关闭之前的执行过程中由于失败而终止,那么如果需要执行后续任务,则需要一个新线程来替代它

  5. 池中的线程将一直存在,直到显式结束。

使用线程池执行线程任务的步骤

  1. 调用 Executors 类的静态方法创建 ExecutorService 对象

  2. 创建一个类,实现 Runnable 接口

  3. 调用 ExecutorService 对象的 execute() 方法来执行 Runnable 接口实例

  4. 最后当不再使用时,调用 ExecutorService 对象的 shutdown() 方法关闭线程池

Executors 是一个工厂类,主要用来创建 ExecutorService:

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

用 newFixedThreadPool() 这个静态方法用来创建一个可重用固定线程数的线程池。

四、示例代码

// 创建一个实现Runnable接口的类 Thread1
class Thread1 implements Runnable {@Overridepublic void run() {for (int i = 0; i <= 50; i++) {if (i % 2 == 0) {System.out.println(Thread.currentThread().getName() + "的i值为: " + i);}}}
}// 创建一个实现Runnable接口的类 Thread2
class Thread2 implements Runnable {@Overridepublic void run() {for (int i = 0; i <= 50; i++) {if (i % 2 != 0) {System.out.println(Thread.currentThread().getName() + "的i值为: " + i);}}}
}public class ThreadPool {public static void main(String[] args) {// 创建一个固定线程数的的线程池ExecutorService pool = Executors.newFixedThreadPool(10);// 向线程中提交两个任务,执行指定的线程的操作。需要提供实现Runnable接口实现类的对象pool.execute(new Thread1());pool.execute(new Thread2());// 关闭线程池pool.shutdown();}
}

运行结果:

五、剖析 ThreadPoolExecutor 底层源码

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

1、创建一个线程池时的参数

  • corePoolSize 表示核心池的大小

当提交一个任务到线程池的时候,线程池将会创建一个线程来执行任务。这个值的大小设置非常关键,设置过小将频繁地创建或销毁线程,设置过大则会造成资源浪费。线程池新建线程的时候,如果当前线程总数小于 corePoolSize,则新建的是核心线程;如果超过 corePoolSize,则新建的是非核心线程。

  • maximumPoolSize 表示线程池的最大数量

如果队列(workQueue))满了,并且已创建的线程数小于 maximumPoolSize,则线程池会进行创建新的线程执行任务。如果等待执行的线程数 大于 maximumPoolSize,缓存在队列中;如果 maximumPoolSize 等于 corePoolSize,即是固定大小线程池。

  • keepAliveTime 表示线程活动的保持时间

当需要执行的任务很多,线程池的线程数大于核心池的大小时,keepAliveTime 才起作用;当一个非核心线程,如果不干活 (闲置状态) 的时长超过这个参数所设定的时长,就会被销毁掉。

  • TimeUnit 表示线程活动保持时间的单位

它的单位有:TimeUnit.DAYS,TimeUnit.HOURS,TimeUnit.MINUTES,TimeUnit.MILLISECONDS,TimeUnit.MICRODECONDS

  • workQueue 表示线程池中存放被提交但尚未被执行的任务的队列

维护着等待执行的 Runnable 对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

  • threadFactory 用于设置创建线程的工厂

它用来给每个创建出来的线程设置一个名字,就可以知道线程任务是由哪个线程工厂产生的。

  • handler 表示拒绝处理策略

线程数量大于最大线程数,当超过 workQueue 的任务缓存区上限的时候,就可以调用该策略,这是一种简单的限流保护。

从 JDK 源码里面可以看到:

其中里面其设置的 corePoolSize(核心池的大小) 和 maximumPoolSize(最大线程数) 都是 nThreads,其设定的阻塞队列是无界的,则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。因此阿里巴约编码规范不推荐用 Executors 来创建 ThreadPoolExecutor。

使用 Executors 创建线程池时要明确创建的阻塞队列是否有界。因此最好自己创建 ThreadPoolExecutor。

ThreadPoolExecutor pool1 = (ThreadPoolExecutor) pool;// 设置核心池的大小pool1.setCorePoolSize(15);// setkeepAliveTime()方法 设置线程没有任务时最多保持多长时间后会停止pool1.setKeepAliveTime(60000, TimeUnit.HOURS);

Executors 的工厂方法主要就返回了 ThreadPoolExecutor 对象。

ThreadPoolExecutor 继承了 AbstractExecutorService:

public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 表示线程池数量的位数,很明显是29,Integer.SIZE=32private static final int COUNT_BITS = Integer.SIZE - 3;// 表示线程池最大数量,2^29 - 1private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }/** Bit field accessors that don't require unpacking ctl.* These depend on the bit layout and on workerCount being never negative.*/private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}/*** Attempts to CAS-increment the workerCount field of ctl.*/private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);}/*** Attempts to CAS-decrement the workerCount field of ctl.*/private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);}/*** Decrements the workerCount field of ctl. This is called only on* abrupt termination of a thread (see processWorkerExit). Other* decrements are performed within getTask.*/private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get()));}/*** The queue used for holding tasks and handing off to worker* threads.  We do not require that workQueue.poll() returning* null necessarily means that workQueue.isEmpty(), so rely* solely on isEmpty to see if the queue is empty (which we must* do for example when deciding whether to transition from* SHUTDOWN to TIDYING).  This accommodates special-purpose* queues such as DelayQueues for which poll() is allowed to* return null even if it may later return non-null when delays* expire.*/// 用于存放线程任务的阻塞队列private final BlockingQueue<Runnable> workQueue;/*** Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.*/// 重入锁private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/// 线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/// 等待条件支持终止private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;/** All user control parameters are declared as volatiles so that* ongoing actions are based on freshest values, but without need* for locking, since no internal invariants depend on them* changing synchronously with respect to other actions.*//*** Factory for new threads. All threads are created using this* factory (via method addWorker).  All callers must be prepared* for addWorker to fail, which may reflect a system or user's* policy limiting the number of threads.  Even though it is not* treated as an error, failure to create threads may result in* new tasks being rejected or existing ones remaining stuck in* the queue.** We go further and preserve pool invariants even in the face of* errors such as OutOfMemoryError, that might be thrown while* trying to create threads.  Such errors are rather common due to* the need to allocate a native stack in Thread.start, and users* will want to perform clean pool shutdown to clean up.  There* will likely be enough memory available for the cleanup code to* complete without encountering yet another OutOfMemoryError.*/// 创建新线程的线程工厂private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/// 饱和策略private volatile RejectedExecutionHandler handler;

总结:workerCount:表示有效的线程数目 runState:表示线程池里线程的运行状态

2、重要方法

① execute() 方法

// 调用execute方法将线程提交到线程池中 public void execute(Runnable command) {// 如果执行的任务为空,则会抛出空指针异常if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1\. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2\. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3\. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/// 获取线程池的控制状态int c = ctl.get();// 如果workerCount值小于corePoolSizeif (workerCountOf(c) < corePoolSize) {// 添加任务到worker集合当中,成功的话返回if (addWorker(command, true))return;// 如果失败,再次获取线程池的控制状态c = ctl.get();}// 如果corePoolSize已经满了,则需要加入到阻塞队列// 判断线程池的状态以及是否可以往阻塞队列中继续添加runnableif (isRunning(c) && workQueue.offer(command)) {// 获取线程池的状态int recheck = ctl.get();// 再次检查状态,线程池不处于RUNNING状态,将任务从workQueue队列中移除if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果此时队列已满,则会采取相应的拒绝策略else if (!addWorker(command, false))reject(command);} 

总结:往 corePoolSize 中加入任务进行执行 当 corePoolSize 满时往阻塞队列中加入任务 阻塞队列满时并且 maximumPoolSize 已满,则采取相应的拒绝策略

② addWorker() 方法

private boolean addWorker(Runnable firstTask, boolean core) {// 外部循环标志retry:for (;;) {// 获取线程池的状态int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 获取workerCountint wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 跳出循环    if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 实例化worker对象w = new Worker(firstTask);// 获取worker的线程final Thread t = w.thread;// 如果获取的线程不为空if (t != null) {// 初始线程池的锁final ReentrantLock mainLock = this.mainLock;// 得到锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 获取锁后再次检查,获取线程池runStateint rs = runStateOf(ctl.get());// 判断if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 检查线程是否启动if (t.isAlive()) // precheck that t is startable// 如果未启动存活,则抛出异常throw new IllegalThreadStateException();// 往corePoolSize中加入任务    workers.add(w);// 获取workers集合的大小int s = workers.size();// 如果大小超过largestPoolSizeif (s > largestPoolSize)// 重新设置线程池拥有最大线程数的大小largestPoolSize = s;// 改变状态    workerAdded = true;}} finally {// 释放锁mainLock.unlock();}if (workerAdded) {// 运行t.start();// 改变状态workerStarted = true;}}} finally {// 如果worker没有启动成功if (! workerStarted)addWorkerFailed(w);}// 返回worker是否成功启动的标记return workerStarted;
} 

使用线程池的好处

  • 降低资源消耗 。重复利用线程池中已经创建好的线程,不需要每次都创建,降低创建和销毁造成的消耗。

  • 提高响应的速度。当任务分配下来时,任务无需等到创建线程就能被执行,减少了创建线程的时间。

  • 方便进行线程管理。线程无限制的被创建,会占用系统资源并且还会降低系统的稳定性。使用线程池可以进行统一管理,设置核心池的大小,设置线程没有任务时最多保持多长时间。


作者:superstar001

链接:

https://zhuanlan.zhihu.com/p/259324524

这篇关于你真的搞懂Java 线程池了???未必吧!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/703928

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2