ThreadPoolExecutor-线程池知多少

2024-03-14 08:58

本文主要是介绍ThreadPoolExecutor-线程池知多少,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ThreadPoolExecutor

创建线程池的目的?

线程池为了避免频繁的创建和销毁所带来的的性能消耗,而建立的一种池化技术,它是把已经创建的线程放入到池中。当有任务来临的时候可以重用已有的线程,无需等待创建的过程,这用有效的提高程序的响应速度。

为什么用ThreadPoolExecutor创建而不是Executors

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。

Executors返回的线程池对象的弊端:

newFixedThreadPool
newSingleThreadExecutor
newCachedThreadPool

上面三个方法的底层原理都是使用ThreadPoolExecutor

newSingleThreadScheduledExecutor

允许请求的队列长度为Integer.MAX_VALUE可能会堆积大量的请求,从而导致OOM

ThreadPoolExecutor构造

/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set//表示线程池的常驻核心数,如果设置为0,则表示在没有任何任务,销毁线程池;如果大于0,即使没有任务,也会保证线程池的线程数量等于此值。如果设置过小,频繁的创建和销毁,设置过大,浪费系统资源。* @param maximumPoolSize the maximum number of threads to allow in the*        pool必须大于0,大于corePoolSize。此值只有在任务比较多,且不能存放在任务队列时,才能用到。* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.表示线程的存活时间,当线程池空闲时间并且超过了此时间,多余的线程就会销毁到线程池的核心线程数的数量为止。* @param unit the time unit for the {@code keepAliveTime} argumentkeepAliveTime的时间单位* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.线程池的任务队列,线程池的所有线程都在处理任务的时候,新任务缓存到这里排队等到执行* @param threadFactory the factory to use when the executor*        creates a new thread通常在创建线程池时不指定此参数,会使用默认的线程创建工厂的模式,创建线程。* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached线程池的拒绝策略,如果workQueue存储满之后,不能创建新的线程来执行任务,就会使用拒绝策略,属于限流保护机制* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

第六个参数threadFactory默认的线程创建工厂

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {// Executors.defaultThreadFactory() 为默认的线程创建工厂this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();
}
// 默认的线程创建工厂,需要实现 ThreadFactory 接口
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}// 创建线程public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon()) t.setDaemon(false); // 创建一个非守护线程if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY); // 线程优先级设置为默认值return t;}
}

第七个参数RejectedExecutionHandler handler

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 当前工作的线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {// 创建新的线程执行此任务if (addWorker(command, true))return;c = ctl.get();}// 检查线程池是否处于运行状态,如果是则把任务添加到队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再出检查线程池是否处于运行状态,防止在第一次校验通过后线程池关闭// 如果是非运行状态,则将刚加入队列的任务移除if (! isRunning(recheck) && remove(command))reject(command);// 如果线程池的线程数为 0 时(当 corePoolSize 设置为 0 时会发生)else if (workerCountOf(recheck) == 0)addWorker(null, false); // 新建线程执行任务}// 核心线程都在忙且队列都已爆满,尝试新启动一个线程执行失败else if (!addWorker(command, false)) // 执行拒绝策略reject(command);
}
addWorker(Runnable firstTask, boolean core)

第一个参数表示,线程应首先运行的任务,如果没有则设置为null

第二个参数,true表示使用corePoolSize作为阀值,false表示,maximumPoolSize作为阀值。

任务使用线程图:

在这里插入图片描述

execute与submit方法的区别

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L,TimeUnit.SECONDS, new LinkedBlockingQueue(20));
// execute 使用
executor.execute(new Runnable() {@Overridepublic void run() {System.out.println("Hello, execute.");}
});
// submit 使用
Future<String> future = executor.submit(new Callable<String>() {@Overridepublic String call() throws Exception {System.out.println("Hello, submit.");return "Success";}
});
System.out.println(future.get());

两者都是执行线程池任务的,submit可以接受返回值。submit配合future使用。

submit是继承executorService中的方法,execute是继承Executor中的方法。

线程池的拒绝策略

AbortPolicy,终止策略,线程池会抛出异常并终止执行,他是默认的拒绝策略

CallerRunsPolicy,把任务交给当前线程来执行

DiscardPolicy,忽略此任务(最新的任务)

DiscardOldestPolicy,忽略最早的任务(最先加入队列的任务)

 ThreadPoolExecutor executor=new ThreadPoolExecutor(1,3,10,TimeUnit.SECONDS,new LinkedBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy());for (int i = 0; i < 6; i++) {executor.execute(()->{System.out.println(Thread.currentThread().getName());});}

运行结果

pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
pool-1-thread-1
pool-1-thread-3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.tangtang.boot.launch.LeetCodeSolution.ThreadExample3$$Lambda$1/1496724653@48533e64 rejected from java.util.concurrent.ThreadPoolExecutor@64a294a6[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 5]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at com.example.tangtang.boot.launch.LeetCodeSolution.ThreadExample3.main(ThreadExample3.java:9)

分析:采用了AbortPolicy拒绝策略,抛出了异常,队列中可以存储两个任务,最大可以创建三个线程来完成任务(2+3=5),所以第六个来了就会抛出异常。

自定义拒绝策略

ThreadPoolExecutor executor=new ThreadPoolExecutor(1, 3, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2), new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("这是自定义的拒绝策略");}});for (int i = 0; i <15; i++) {executor.execute(()->{System.out.println(Thread.currentThread().getName());});}

结果如下:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
这是自定义的拒绝策略
这是自定义的拒绝策略
这是自定义的拒绝策略
这是自定义的拒绝策略
这是自定义的拒绝策略
这是自定义的拒绝策略
这是自定义的拒绝策略
这是自定义的拒绝策略
pool-1-thread-2
pool-1-thread-2
pool-1-thread-3

ThreadPoolExecutor扩展方法

beforeExecute
afterExecute
public class ThreadPoolExtend {public static void main(String[] args) {MyThreadPoolExecutor executor=new MyThreadPoolExecutor(2,4,10, TimeUnit.SECONDS,new LinkedBlockingQueue());for (int i = 0; i < 3; i++) {executor.execute(()->{Thread.currentThread().getName();});}}static class MyThreadPoolExecutor extends ThreadPoolExecutor{private final ThreadLocal<Long> localTime = new ThreadLocal<>();public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}@Overrideprotected void beforeExecute(Thread t, Runnable r) {Long sTime = System.nanoTime();// 开始时间 (单位:纳秒)localTime.set(sTime);System.out.println(String.format("%s | before | time=%s",t.getName(),sTime));super.beforeExecute(t,r);}@Overrideprotected void afterExecute(Runnable r,Throwable t){Long eTime = System.nanoTime();// 开始时间 (单位:纳秒)Long totalTime = eTime - localTime.get(); // 执行总时间System.out.println(String.format("%s | after | time=%s | 耗时:%s 毫秒",Thread.currentThread().getName(),eTime,(totalTime/1000000.0)));super.afterExecute(r,t);}}

pool-1-thread-1 | before | time=38938852458000
pool-1-thread-2 | before | time=38938852540300
pool-1-thread-1 | after | time=38938882198300 | 耗时:29.7403 毫秒
pool-1-thread-2 | after | time=38938882205900 | 耗时:29.6656 毫秒
pool-1-thread-1 | before | time=38938883845000
pool-1-thread-1 | after | time=38938884124100 | 耗时:0.2791 毫秒

这篇关于ThreadPoolExecutor-线程池知多少的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/807894

相关文章

Java线程面试题(50)

不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就是内置了对并发的支持,让Java大受企业和程序员的欢迎。大多数待遇丰厚的Java开发职位都要求开发者精通多线程技术并且有丰富的Java程序开发、调试、优化经验,所以线程相关的问题在面试中经常会被提到。 在典型的Java面试中, 面试官会从线程的基本概念问起, 如:为什么你需要使用线程,

线程池ThreadPoolExecutor类源码分析

Java并发编程:线程池的使用   在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:   如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。   那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

线程Lock

线程Lock   在上一篇文章中我们讲到了如何使用关键字synchronized来实现同步访问。本文我们继续来探讨这个问题,从Java 5之后,在java.util.concurrent.locks包下提供了另外一种方式来实现同步访问,那就是Lock。   也许有朋友会问,既然都可以通过synchronized来实现同步访问了,那么为什么还需要提供Lock?这个问题将在下面进行阐述。本文先从s

线程封装,互斥

文章目录 线程封装线程互斥加锁、解锁认识接口解决问题理解锁 线程封装 C/C++代码混编引起的问题 此处pthread_create函数要求传入参数为void * func(void * )类型,按理来说ThreadRoutine满足,但是 这是在内类完成封装,所以ThreadRoutine函数实际是两个参数,第一个参数Thread* this不显示 解决方法: 第

Linux-笔记 线程同步机制

目录 前言 实现 信号量(Semaphore) 计数型信号量 二值信号量  信号量的原语操作 无名信号量的操作函数 例子 互斥锁(mutex) 互斥锁的操作函数 例子 自旋锁 (Spinlock) 自旋锁与互斥锁的区别 自旋锁的操作函数 例子 前言         线程同步是为了对共享资源的访问进行保护,确保数据的一致性,由于进程中会有多个线程的存在,

jmeter之Thread Group(线程组)

Thread Group(线程组) 1.线程组,或者可以叫用户组,进行性能测试时的用户资源池。 2.是任何一个测试计划执行的开始点。 3.上一篇提到的“控制器”和“HTTP请求”(采集器)必须在线程组内;监听器等其他组件,可以直接放在测试计划下。 线程组设置参数的意义 我们以下图为例,进行详细说明。见下图:  区域1(在取样器错误后要执行的动作) 这个区域的主要作用很明显,在线程内

如何在Android中实现多线程与线程池?

目录 一、Android介绍二、什么是多线程三、什么是线程池四、如何在Android中实现多线程与线程池 一、Android介绍 Android是一种基于Linux内核的开源操作系统,由Google公司领导开发。它最初于2007年发布,旨在为移动设备提供一种统一、可扩展的操作系统。Android系统以其高度的可定制性和丰富的应用生态而受到广泛欢迎,如今已经成为全球最流行的

线程间通信方式(互斥(互斥锁)与同步(无名信号量、条件变量))

1通信机制:互斥与同步 线程的互斥通过线程的互斥锁完成; 线程的同步通过无名信号量或者条件变量完成。 2  互斥 2.1 何为互斥?         互斥是在多个线程在访问同一个全局变量的时候,先让这个线程争抢锁的资源,那个线程争抢到资源,它可以访问这个变量,没有争抢到资源的线程不能够访问这个变量。那这种只有一个线程能够访问到这个变量的现象称之为线程间互斥。 2.2互斥锁API 1.

线程C++

#include <thread>#include <chrono>#include <cmath>#include <mutex>#include <iostream>using namespace std;mutex mtx;void threadCommunicat(){int ans = 0;while (ans<=3){mtx.lock();//上锁cout << "a

线程知识点(一)

文章目录 一、线程是什么?二、进程与线程的关系三、种类内核级线程用户级线程混合型线程 总结 一、线程是什么? 线程是程序最基本的运行单位,真正运行的是进程中的线程。 线程是大多数操作系统支持的调度单位, 执行单元,某些系统不支持线程技术。 是允许应用程序并发执行多个任务的一种机制,同一程序中的所有线程均会独立执行相同程序。 共享同一份全局内存区域,其中包据初始化数据段、未初