(十) ExecutorService

2023-12-19 07:58
文章标签 executorservice

本文主要是介绍(十) ExecutorService,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言:上一篇FutureTask的执行过程中其实与ExecutorService有关,但是由于对这个知识点不是很熟,所以就略过了,现在专门就这个知识点学习一下。


参考博客:Java线程池 ExecutorService

demo地址:点击打开链接



1. ExecutorService的简单使用

简单地看了下ExecutorService的API,感觉还是适合一边看源码一边写例子比较靠谱,一上来就写个简单的例子热身一下。

package com.example.demo_10_executorservice;import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class MainActivity extends AppCompatActivity {private static final String TAG = "simpleTest";@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);ExecutorService executorService = Executors.newFixedThreadPool(5);Future<Integer> future = executorService.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int sum = 0;for(int i = 0; i< 10; i++){sum += i;}return sum;}});try {Log.d(TAG, "the sum is: " + future.get());}catch (InterruptedException | ExecutionException ex){ex.printStackTrace();}executorService.shutdown();}
}

简单来讲就是先新建一个大小为5的线程池,然后在里面执行一个计算0到10累加的任务,执行完毕以后将线程池关闭。(线程池建立看了下适合下一篇博客再写)


2. ExecutorService的继承关系

先拷贝一个总体上继承关系的图

本博客主要讲的ExecutorService就是继承的Executor接口,而ExecutorService本身自己也是一个接口。


2.1 Executor

先看下Executor:

* The {@code Executor} implementations provided in this package* implement {@link ExecutorService}, which is a more extensive* interface.  The {@link ThreadPoolExecutor} class provides an* extensible thread pool implementation. The {@link Executors} class* provides convenient factory methods for these Executors.** <p>Memory consistency effects: Actions in a thread prior to* submitting a {@code Runnable} object to an {@code Executor}* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>* its execution begins, perhaps in another thread.** @since 1.5* @author Doug Lea*/
public interface Executor {/*** Executes the given command at some time in the future.  The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command);
}


从上面注释可以看出如下几点:

  1. ExecutorService是Executor更充分的实现
  2. Executors为线程池提供了便捷的工厂类(与Executor有关,但不是一回事)
  3. 一致性原则:任务提交总是在执行之前
  4. Executor只有void execute(Runnable command)一个方法


2.2 ExecutorService

ExecutorService本身是一个接口,它继承了Executor接口,并在此基础上进行了拓展。

public interface ExecutorService extends Executor
api接口如下图所示


3.  API分析及实际使用

本文主要还是就ExecutorService的api进行学习,但是由于它只是个接口,所以不可避免选择一个具体的实现进行接口的实际使用,所以还是很粗糙很不全面的演示,本文主要是用的newFixedThreadPool;最好还是后面学习Executors及其生成的各种线程池再来回顾一下。


3.1 submit

submit方法主要就是用来提交单个任务执行的,并且返回一个代表任务执行结果的Future对象,可以调用Future的get()方法获取任务的执行结果。

 /*** Submits a value-returning task for execution and returns a* Future representing the pending results of the task. The* Future's {@code get} method will return the task's result upon* successful completion.** <p>* If you would like to immediately block waiting* for a task, you can use constructions of the form* {@code result = exec.submit(aCallable).get();}** <p>Note: The {@link Executors} class includes a set of methods* that can convert some other common closure-like objects,* for example, {@link java.security.PrivilegedAction} to* {@link Callable} form so they can be submitted.** @param task the task to submit* @param <T> the type of the task's result* @return a Future representing pending completion of the task* @throws RejectedExecutionException if the task cannot be*         scheduled for execution* @throws NullPointerException if the task is null*/<T> Future<T> submit(Callable<T> task);/*** Submits a Runnable task for execution and returns a Future* representing that task. The Future's {@code get} method will* return the given result upon successful completion.** @param task the task to submit* @param result the result to return* @param <T> the type of the result* @return a Future representing pending completion of the task* @throws RejectedExecutionException if the task cannot be*         scheduled for execution* @throws NullPointerException if the task is null*/<T> Future<T> submit(Runnable task, T result);/*** Submits a Runnable task for execution and returns a Future* representing that task. The Future's {@code get} method will* return {@code null} upon <em>successful</em> completion.** @param task the task to submit* @return a Future representing pending completion of the task* @throws RejectedExecutionException if the task cannot be*         scheduled for execution* @throws NullPointerException if the task is null*/Future<?> submit(Runnable task);

从上面的源码可以看出几点不一样的细节:

  1. 如果想立刻阻塞等待任务执行结果可以调用result = exec.submit(aCallable).get()
  2. <T> Future<T> submit(Runnable task, T result)已将任务结果写死了,不能像Callable一样返回result
  3. Future<?> submit(Runnable task)执行成功后返回的结果是Null

对应两个demo,源码即运行结果如下:

private void submit2(){ExecutorService executorService = Executors.newFixedThreadPool(5);Future<String> future = executorService.submit(new Runnable() {@Overridepublic void run() {Log.d(TAG, "do something");}}, "jiatai happy");try {Log.d(TAG, "the result of submit2 is: " + future.get());}catch (InterruptedException | ExecutionException ex){ex.printStackTrace();}executorService.shutdown();}private void submit3(){ExecutorService executorService = Executors.newFixedThreadPool(5);Future<?> future = executorService.submit(new Runnable() {@Overridepublic void run() {Log.d(TAG, "do something");}});try {Log.d(TAG, "the result of submit3 is: " + future.get());}catch (InterruptedException | ExecutionException ex){ex.printStackTrace();}executorService.shutdown();}

运行结果:

03-11 11:07:37.740 6413-6413/com.example.demo_10_executorservice D/simpleTest: the result of submit1 is: 45
03-11 11:07:37.750 6413-6453/com.example.demo_10_executorservice D/simpleTest: do something
03-11 11:07:37.750 6413-6413/com.example.demo_10_executorservice D/simpleTest: the result of submit2 is: jiatai happy
03-11 11:07:37.752 6413-6459/com.example.demo_10_executorservice D/simpleTest: do something
03-11 11:07:37.752 6413-6413/com.example.demo_10_executorservice D/simpleTest: the result of submit3 is: null


3.2 shutdown and shutdownNow

当我们使用完成ExecutorService之后应该关闭它,否则它里面的线程会一直处于运行状态。

举个例子,如果的应用程序是通过main()方法启动的,在这个main()退出之后,如果应用程序中的ExecutorService没有关闭,这个应用将一直运行。之所以会出现这种情况,是因为ExecutorService中运行的线程会阻止JVM关闭。

如果要关闭ExecutorService中执行的线程,我们可以调用ExecutorService.shutdown()方法。在调用shutdown()方法之后,ExecutorService不会立即关闭,但是它不再接收新的任务,直到当前所有线程执行完成才会关闭,所有在shutdown()执行之前提交的任务都会被执行。

如果我们想立即关闭ExecutorService,我们可以调用ExecutorService.shutdownNow()方法。这个动作将跳过所有正在执行的任务和被提交还没有执行的任务。但是它并不对正在执行的任务做任何保证,有可能它们都会停止,也有可能执行完成。

复制粘贴了一段,大概意思是说如果没有调用shutdown,有可能会导致内存泄露。

源码如下所示:

/*** Initiates an orderly shutdown in which previously submitted* tasks are executed, but no new tasks will be accepted.* Invocation has no additional effect if already shut down.** <p>This method does not wait for previously submitted tasks to* complete execution.  Use {@link #awaitTermination awaitTermination}* to do that.*/void shutdown();/*** Attempts to stop all actively executing tasks, halts the* processing of waiting tasks, and returns a list of the tasks* that were awaiting execution.** <p>This method does not wait for actively executing tasks to* terminate.  Use {@link #awaitTermination awaitTermination} to* do that.** <p>There are no guarantees beyond best-effort attempts to stop* processing actively executing tasks.  For example, typical* implementations will cancel via {@link Thread#interrupt}, so any* task that fails to respond to interrupts may never terminate.** @return list of tasks that never commenced execution*/List<Runnable> shutdownNow();

我对其他博客上的说法还是心存怀疑的,比较信服源码上的注释和我自己测试的结果,从上面的注释可以看出:

  1. shutdown原来是初始化一个有序的对已提交任务的关闭,但是不接受新的任务,如果线程池已经关闭了再次调用将没有影响
  2. 如果想要等待先前提及但还没执行完毕的任务,那么请调用awaitTermination
  3. shutdownNow会尝试停止所有活跃的正在执行的任务,暂停正在等待的任务,并且返回一个等待执行任务的集合。它不会等正在执行的任务完成。并且也不保证会停止正在执行的任务,和interrupt有异曲同工之妙。


下面就是写demo一个一个验证了。

3.2.1 验证shutdown

shutdown原来是初始化一个有序的对已提交任务的关闭,但是不接受新的任务,如果线程池已经关闭了再次调用将没有影响

shutdown自然会遇到4种情况

  1. 已经执行完的任务
  2. 正在执行的任务
  3. 正在等待执行的任务
  4. 已经调用shutdown后才提交的任务

所以写demo的时候最好是充分地考虑这种情况,API中对这四种情况说得不是很清楚。就我猜测:

  1. 对已经执行完的任务没有影响
  2. 对于正在执行的任务会等待任务执行完毕
  3. 对于正在等待执行的任务会取消执行
  4. 已经调用shutdown后才提及的任务是提交不了的

对应demo(task1 2 3 4分别和四种情况相对应):

private void shutdown1(){Log.d(TAG, "----shutdown1 begin---- ");ExecutorService executorService = Executors.newFixedThreadPool(1);Log.d(TAG, "----task1 begin---- ");Future future1 = executorService.submit(new Task1());Log.d(TAG, "----task2 begin---- ");Future future2 = executorService.submit(new Task2());try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}Log.d(TAG, "----task3 begin---- ");Future future3 = executorService.submit(new Task3());executorService.shutdown();Log.d(TAG, "----executorService.shutdown---- ");Future future4 = null;try {future4 = executorService.submit(new Task4());} catch (Exception e) {e.printStackTrace();} finally {}try {Log.d(TAG, "the result of shutdown1 is: " + future1.get());Log.d(TAG, "the result of shutdown1 is: " + future2.get());Log.d(TAG, "the result of shutdown1 is: " + future3.get());Log.d(TAG, "the result of shutdown1 is: " + future4.get());}catch (InterruptedException | ExecutionException ex){ex.printStackTrace();}catch (Exception ex){ex.printStackTrace();}Log.d(TAG, "----shutdown1 end---- ");}private class Task1 implements Callable<String>{@Overridepublic String call() throws Exception {int sum = 0;for(int i = 0; i< 10; i++){sum += i;}return "task1";}}private class Task2 implements Callable<String>{@Overridepublic String call() throws Exception {int sum = 0;for(int i = 0; i< 10; i++){sum += i;}Thread.sleep(2000);return "task2";}}private class Task3 implements Callable<String>{@Overridepublic String call() throws Exception {int sum = 0;for(int i = 0; i< 10; i++){sum += i;}Thread.sleep(2000);return "task3";}}private class Task4 implements Callable<String>{@Overridepublic String call() throws Exception {int sum = 0;for(int i = 0; i< 10; i++){sum += i;}Thread.sleep(2000);return "task4";}}

对应运行结果:

03-11 11:46:54.034 13116-13116/? D/simpleTest: ----shutdown1 begin---- 
03-11 11:46:54.035 13116-13116/? D/simpleTest: ----task1 begin---- 
03-11 11:46:54.035 13116-13116/? D/simpleTest: ----task2 begin---- 
03-11 11:46:54.536 13116-13116/? D/simpleTest: ----task3 begin---- 
03-11 11:46:54.537 13116-13116/? D/simpleTest: ----executorService.shutdown---- 
03-11 11:46:54.538 13116-13116/? D/simpleTest: the result of shutdown1 is: task1
03-11 11:46:56.036 13116-13116/com.example.demo_10_executorservice D/simpleTest: the result of shutdown1 is: task2
03-11 11:46:58.037 13116-13116/com.example.demo_10_executorservice D/simpleTest: the result of shutdown1 is: task3
03-11 11:46:58.038 13116-13116/com.example.demo_10_executorservice D/simpleTest: ----shutdown1 end---- 

task1用了0.5s,由于线程中间停了0.5s

task2用了2s,没问题

task3用了3.5s,1.5s等待+2s执行,有点小问题,以为它执行不了的=-=

task4由于无法执行抛出了异常,future是个空指针。

验证结果如下:

  1. 对已经执行的没影响
  2. 对正在执行的没影响
  3. 对于还在等待的任务也没影响=-=
  4. 对于shutdown后才提交的任务会抛出java.util.concurrent.RejectedExecutionException。


对于还在等待的任务有可能是有序的shutdown,正好由于时间比较短,还在等待的任务抽空执行完了?

我把task2的任务持续了20s,但task3这个任务还是抽空完成了,这里有可能有点问题,待续



3.2.2 验证shutdownNow

shutdownNow的demo只要把上面的shutdown demo改为调用shutdownNow就好了,但是结果有点奇怪。

执行到最后future2抛出了中断异常,说明shutdownNow的内部逻辑是调用到了Thread.interrupt的,而后面的future3和shutdown1 end反而调用不到了,好奇怪啊,也没有异常信息。

想了下future3的get()方法阻塞住了?shutdownNow把正在执行的任务干掉,也会让后来的任务抛出RejectedExecutionException,但是对future3这种等待中的任务没有很好地处理?

突然想起来“halts the processing of waiting tasks”指的是暂停所有等待的线程。

List<Runnable> list = executorService.shutdownNow();Log.d(TAG, "----executorService.shutdownNow---- ");for(Runnable runnable : list){Log.d(TAG, "runnable is : " + runnable.toString());}

然后打印出来的runnable就是一个

03-11 17:00:27.673 23862-23862/com.example.demo_10_executorservice D/simpleTest: runnable is : java.util.concurrent.FutureTask@42ffac

从这里可以看出调用shutdownNow以后暂停的任务是不能再调用get方法的,应该任务暂停了,get会一直阻塞住,会导致后续走不下去。


3.2.3 验证awaitTermination

和shutdownNow一样只改了一下调用的方法为awaitTermination,然后发现这个方法确实和api说的一样会阻塞住等所有任务都执行完,从3个任务的result打印时间可以看出是3个任务都完成了awaitTermination才结束阻塞的。

而从task4也可以得到结果来看awaitTermination和shutdown完全不是一回事,awaitTermination执行完后线程池可以继续接受任务。所以我最后加了个shutdown来关闭线程池,具体见demo,我就不贴代码了。

其他的api就略了,反正后面学习Executors还是会再来看一遍的。


4. 总结

主要了解了下ExecutorService主要api及其功能,结合具体的demo实现心中想法并得以确认,虽然还有些疑惑,留到后面学习Executors再说吧。


疑惑:shutdown调用后为什么等待的线程仍然可以执行完成?



这篇关于(十) ExecutorService的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/511438

相关文章

ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析

ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析 时间 2014-07-27 16:15:07   CSDN博客 原文   http://blog.csdn.net/aitangyong/article/details/38172189 主题  Java ExecutorService是JDK并发工

Java零基础-线程池(`ExecutorService`)的使用

哈喽,各位小伙伴们,你们好呀,我是喵手。运营社区:C站/掘金/腾讯云/阿里云/华为云/51CTO;欢迎大家常来逛逛。   今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。   我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把自己所学到所会的,通过文章的形式进行输出

线程池原理–执行器ExecutorService

文章目录 线程池原理–执行器ExecutorService相关方法shutdownsubmit批量提交任务 线程池原理–总索引 线程池原理–执行器ExecutorService ExecutorService 也是一个接口,继承Executor接口。 java interface ExecutorService extends Executor 相关方法 shu

Java 并发学习之ExecutorService

在Java5之后,并发线程这块发生了根本的变化,最重要的莫过于新的启动、调度、管理线程的一大堆API了。在Java5以后,通过Executor来启动线程比用Thread的start()更好。在新特征中,可以很容易控制线程的启动、执行和关闭过程,还可以很容易使用线程池的特性 一、创建任务 任务就是一个实现了Runnable接口的类。 创建的时候实run方法即可。 二、执行任务 通过java.util

[Java] CountDownLatch和ExecutorService的简单使用

文章目录 [Java] CountDownLatch和ExecutorService的简单使用一、前言二、记录 [Java] CountDownLatch和ExecutorService的简单使用 一、前言 环境说明: JDK:1.8 官方API文档:https://docs.oracle.com/javase/8/docs/api/index.html 参考:

ExecutorService引发的血案(三)ThreadPoolExecutor

前面我们提到了ExecutorService结构中的一个工厂类,Executors。这个类提供了一系列构造ExecutorService实例的方法。 这些方法的核心就是两个类,分别是 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 类。(当然还有别的类,比较常用的就是这两个) 今天介绍的就是 ThreadPoolExecutor。 简介 这

ExecutorService引发的血案(二)ExecutorService使用

上一节中讲到了ExecutorService中有一些管理Thread的方法 execute(Runnable)submit(Runnable)submit(Callable)invokeAny(...)invokeAll(...) execute(Runnable) 这个方法使用的参数是 java.lang.Runnable 包中的对象,调用这个方法之后将会异步执行runnable。

ExecutorService引发的血案(一)结构

最近使用了github上面的一个多线程下载的库 MultiThreadDownloader,发现挺好用。于是打开看了一下源码,发现了ExecutorService这个东西。之前多多少少接触到了这个东西,知道是java并发编程里面使用的,但是一直不是很了解所以花点时间看了一下。 下面这张图,就是ExecutorService的结构图 (ScheduledThreadPoolExecutor稍

Runnable,Callable,Future,RunnableFuture,FutureTask,ExecutorService的关系

关系 Executor就是Runnable和Callable的调度容器,Future就是对于具体的调度任务的执行结果进行查看,最为关键的是Future可以检查对应的任务是否已经完成,也可以阻塞在get方法上一直等待任务返回结果。Runnable和Callable的差别就是Runnable是没有结果可以返回的,就算是通过Future也看不到任务调度的结果的。  FutureTask则是一个R

多线程--ExecutorService

相比ExecutorService,CompletionService可以更精确和简便地完成异步任务的执行 CompletionService的一个实现是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合体,Executor完成计算任务,BlockingQueue负责保存异步任务的执行结果。 ExecutorCompletionSer