本文主要是介绍线程池ThreadPool工作中的再次思考,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
ThreadPoolExecutor类实现了ExecutorService接口和Executor接口,可以设置线程池corePoolSize,最大线程池大小maximumPoolSize,拒绝策略。构造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)
参数含义:
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
unit: 线程池维护线程所允许的空闲时间的单位
workQueue:线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
当一个任务通过execute(Runnable)方法欲添加到线程池时:
l 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
2 如果此时线程池中的数量大于等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
3 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
4 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
需要注意的是:如果缓冲队列workQueue是采用无界的话,那么maximumPoolSize的设置将无意义。当线程池中的数量大于等于 corePoolSize新的任务会一直添加到缓存队列中。
workQueue常用的是:ArrayBlockingQueue LinkedBlockingQueue等。
常用的拒绝策略:
AbortPolicy
为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
DiscardPolicy
直接抛弃,任务不执行,空方法
DiscardOldestPolicy
从队列里面抛弃head的一个任务,并再次execute 此task。
CallerRunsPolicy
该策略直接在调用者线程中运行当前被拒绝的任务,但是,调用者线程性能可能急剧下降。
最近在工作中接触到一个比较暴力的线程拒绝策略,引发了比较大的问题,当并发多时,时常导致服务器down掉。
我将工作中的用到的线程池简化为如下:
/*** * @author kxl**/
package test;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class WorkThreadPool {private final static int corePoolSize = 2;private final static int maximumPoolSize = 4;private final static int keepAliveTime =3;private final static long waitTime =5;private static ThreadPoolExecutor workers;public static void init() {BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(1);//有界 队列长度1RejectedExecutionHandler handler = new RejectedExecutionHandler() {public void rejectedExecution(Runnable r,ThreadPoolExecutor executor) {try {Thread.sleep(WorkThreadPool.waitTime);} catch (InterruptedException e) {}executor.execute(r);}};if (workers == null) {workers = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, TimeUnit.SECONDS, workQueue,handler);workers.allowCoreThreadTimeOut(true);}}public synchronized static void addTask(Integer task) {if (workers == null)init();workers.execute(new WorkThread(task));//System.out.println("worder size=="+workers.getPoolSize());}public synchronized static boolean removeTask(Integer publishQueue) {if (workers == null)return false;for (Runnable runnable : workers.getQueue()) {WorkThread pubThread = (WorkThread) runnable;if (publishQueue != null&& publishQueue.equals(pubThread.getTask())) {return workers.remove(runnable);}}return false;}public static void main(String[] args) {for(int i=0;i<200;i++){WorkThreadPool.addTask(i);}}}class WorkThread implements Runnable {public int task;public WorkThread(int i){this.task=i;}public int getTask() {return task;}public void setTask(int task) {this.task = task;}public void run() {System.out.println(Thread.currentThread().getName() + " is running task"+task);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
上面构建了一个corePoolSize为2 最大线程为4,且缓存队列长度为1。来测试一下改拒绝策略。
由上面的结果可以看到添加新任务线程时,进入拒绝策略,又调用executor.execute(r);可以看该源码,里面又将这个任务继续添加进线程任务中,然后又会被执行到拒绝策略,有疑似死循环的迹象。非常不合理的处理方案。
可以考虑用CallerRunsPolicy策略:
RejectedExecutionHandler handler = new RejectedExecutionHandler() {public void rejectedExecution(Runnable r,ThreadPoolExecutor executor) {try {Thread.sleep(WorkThreadPool.waitTime);} catch (InterruptedException e) {}//executor.execute(r);WorkThread pubThread = (WorkThread) r;System.out.println("拒绝策略:"+pubThread.getTask());//上面注释的方法,大量并发时,一直尝试往已经满了的线程池添加任务,一直重复进入拒绝策略(疑似死循环)//主线程去调用 相当于CallerRunsPolicy策略if (!executor.isShutdown()) {r.run();}}};if (workers == null) {workers = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, TimeUnit.SECONDS, workQueue,handler);workers.allowCoreThreadTimeOut(true);}
上面的代码相当于
workers = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime,TimeUnit.SECONDS, workQueue,new ThreadPoolExecutor.CallerRunsPolicy());
我为了打印一些输出,用上面的方式,可以看callerRunsPolicy源码,里面就是通过r.run进行调用的。
如上图输出,当进入拒绝策略后,会由调用者线程中(在本例中就是main主线程)去执行被拒绝的任务,而不是像上面的方法,再次添加到线程任务中这么粗暴的方案了。
这篇关于线程池ThreadPool工作中的再次思考的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!