本文主要是介绍Java并发类库中提供的线程池有哪几种,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、并发类库中的线程池
线程是不能重复启动的,频繁创建和销毁线程存在一定的开销。为了减小这种开销,提高资源利用率,我们可以使用线程池,让线程进行复用。使用线程池后,创建线程相对于从线程池中获取一个空闲线程,销毁线程相对于向线程池归还线程。
通常开发者都是利用Executors中的线程池创建方法创建线程池。通过指定不同的参数就可以创建出不同类型的线程池。
Executors提供的线程池创建配置:
- newCachedThreadPool(),是一种处理短时间内大量工作任务的线程池。它会缓存线程并重用,当无缓存线程可用时,它会创建新线程。如果线程空闲的时间超过60s,会被终止并移出缓存。长时间闲置时,这种线程池不会消耗生命资源。其内部使用SynchronousQueue作为工作队列。
- newFixedThreadPool(int nThreads),重用指定数目的线程,任何时候最多有nThreads个线程是活动的。其实部使用LinkedBlockingQueue作为工作队列。如果任务数量超过指定的线程数,那么任务会在工作队列中等待。
- newSingleThreadExecutor(),它的特点在于工作线程的数目为1,可以看做newFixedThreadPool的退化版。其内部使用LinkedBlockingQueue作为工作队列,所有任务都是被顺序执行,最多有一个任务处于活动状态。
- newSingleThreadExecutor()和newScheduledThreadPool(int corePoolSize),创建的是ScheduledExecutorService,可以进行定时的或周期性的工作制度,二者的区别在于是一个线程还是多个线程。
- newWorkingStealingPool(int parallelism),其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序。
二、Executor框架结构
- Executor是一个基础的接口,其初衷是将任务提交和任务执行细节进行解耦。
void execute(Runnable command);
- ExecutorService更加完善,不仅提供service的管理功能,比如shutdown等方法,也提供了更加全面的任务提交机制,如返回Future的submit方法。
//输入的是Callable,解决了Runnable无法返回结果的问题
<T> Future<T> submit(Callable<T> task);
- ScheduledExecutorService,定时调度接口
- AbstractExecutorService,执行框架抽象类
- ThreadPoolExecutor,JDK中线程池的具体实现
- Executors,线程池工厂类,提供了各种方便的静态工厂方法
三、线程池的具体实现
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keyAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
通过配置不同的参数,就可以创建出行为大相径庭的线程池,这就是线程池高度灵活的基础。
- corePoolSize,核心线程数,可以大致理解为长期驻留的线程数目。对于不同的线程池,这个值不同。newFixedThreadPool会将其设置为nThreads,newCachedThreadPool会将其设置为0。
- maximunPoolSize,能够创建的最大线程数。对于newFixedThreadPool是nThreads,而对于newCachedThreadPool则为Integer.MAX_VALUE。
- keepAliveTiem和TimeUnit这两个参数指定了额外的线程能够闲置多久,有些线程池不需要它,newCachedThreadPool需要此参数
- workQueue,工作队列,必须为BlockingQueue。创建不同的线程池,可以选择不同的工作队列。有SynchronousQueue,LInkedBlockingQueue,ArrayBlockingQueue,PriorityBlockingQueue等。
- threadFactory提供线程创建逻辑。线程池的工作线程被抽象为静态内部类Worker,基于AQS实现。
- Handler指定了拒绝策略,当任务数量超过系统的负载时就可以使用拒绝策略了。
四、核心线程池的内部实现
对于核心的几个线程池,无论是newFixedThreadPool()方法,newSingleThreadExecutor()还是newCachedThreadPool()方法,虽然看起来创建的线程有着完全不同的功能特点,但是它们都是有ThreadPoolExecutor实现。下面给出了具体的实现方式:
public static ExecutorService newFixedThreadPool(int nThreads){return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}public static ExecutorService newSingleThreadExecutor(){return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQUeue<Runnable>());
}public static ExecutorService newCachedThreadPool(){return new ThreadPoolExecutor(0,Integer.MAX_VLAUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
五、ThreadPoolExecutor核心调度代码
这里给出ThreadPoolExecutor线程池的核心调度代码,这段代码也充分体现了上述线程池的工作逻辑:
public void execute(Runnable command){if(command==null)throw new NullPointerException();int c=ctl.get();if(workCountOf(c)<corePoolSize){if(addWorker(command,true))return c=ct1.get();}if(isRunning(c) && workQueue.offer(command)){int recheck=ce1.get();if(!isRunning(recheck) && remove(command))reject(command);else if(workCountOf(recheck) == 0)addWorkder(null,false);}else if(!addWorkder(command,fale))reject(command);
}
代码第5行的workerCountOf()函数取得了当前线程池的线程总数,当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行。否则在第10行代码处加入等待队列(workQueue.offer())。如果进入等待队列失败(比如有界队列到达了上限,或者使用了SynnchronousQueue队列),则会执行第17行,将任务直接提交给线程池。如果当前线程数已经达到了maximunPoolSize,则会执行第18行的拒绝策略。
六、线程池的工作线程
线程池的工作线程被抽象为静态内部类worker,使用aqs实现。
参考文章:https://medium.com/@oliver_hu/java-series-basics-q-a-part-5-1613ae282ac2
这篇关于Java并发类库中提供的线程池有哪几种的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!