ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析

2024-09-04 12:48

本文主要是介绍ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析

ExecutorService是JDK并发工具包提供的一个核心接口,相当于一个线程池,提供执行任务和管理生命周期的方法。 ExecutorService接口中的大部分API都是比较容易上手使用的,本文主要介绍下invokeAll和invokeAll方法的特性和使用。我们先提供几个任务类:一个耗时任务,一个异常任务,一个短时任务。他们会在接下来的测试代码中使用。

package tasks;import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;public class SleepSecondsCallable implements Callable<String>
{
  private String name;  private int seconds;  public SleepSecondsCallable(String name, int seconds)
  {
    this.name = name;
    this.seconds = seconds;
  }  public String call() throws Exception
  {
    System.out.println(name + ",begin to execute");    try
    {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e)
    {
      System.out.println(name + " was disturbed during sleeping.");
      e.printStackTrace();
      return name + "_SleepSecondsCallable_failed";
    }    System.out.println(name + ",success to execute");    return name + "_SleepSecondsCallable_succes";
  }}

这是一个通过睡眠来模拟的耗时任务,该任务是可中断/可终止的任务,能够响应中断请求。

package tasks;import java.util.concurrent.Callable;public class ExceptionCallable implements Callable<String>
{  private String name = null;  public ExceptionCallable()
  {  }  public ExceptionCallable(String name)
  {
    this.name = name;
  }  @Override
  public String call() throws Exception
  {
    System.out.println("begin to ExceptionCallable.");    System.out.println(name.length());    System.out.println("end to ExceptionCallable.");    return name;
  }}

这是一个可能会在执行过程中,抛出空指针异常的任务。

package tasks;import java.util.Random;
import java.util.concurrent.Callable;public class RandomTenCharsTask implements Callable<String>
{  @Override
  public String call() throws Exception
  {
    System.out.println("RandomTenCharsTask begin to execute...");    StringBuffer content = new StringBuffer();    String base = "abcdefghijklmnopqrstuvwxyz0123456789";    Random random = new Random();    for (int i = 0; i < 10; i++)
    {
      int number = random.nextInt(base.length());
      content.append(base.charAt(number));
    }    System.out.println("RandomTenCharsTask complete.result=" + content);
    return content.toString();
  }}

这是一个正常的短时的任务,产生10个随机字符组成的字符串。

1.测试invokeAny()

第一种情况,向线程池提交2个耗时任务SleepSecondsCallable

/*** 提交的任务集合,一旦有1个任务正常完成(没有抛出异常),会终止其他未完成的任务*/
public static void invokeAny1() throws Exception
{
  ExecutorService executorService = Executors.newFixedThreadPool(3);  List<Callable<String>> tasks = new ArrayList<Callable<String>>();  tasks.add(new SleepSecondsCallable("t1", 2));
  tasks.add(new SleepSecondsCallable("t2", 1));  String result = executorService.invokeAny(tasks);  System.out.println("result=" + result);  executorService.shutdown();
}
程序的执行结果是:返回t2线程的执行结果t2_SleepSecondsCallable_succes,同时t1抛出java.lang.InterruptedException: sleep interrupted。

也就说: 一旦有1个任务正常完成(执行过程中没有抛异常),线程池会终止其他未完成的任务 。

第二种情况,向线程池提交3个异常任务ExceptionCallable

/**
* 没有1个正常完成的任务,invokeAny()方法抛出ExecutionException,封装了任务中元素的异常
* 
*/
public static void invokeAny2() throws Exception
{
  ExecutorService executorService = Executors.newFixedThreadPool(3);  List<Callable<String>> tasks = new ArrayList<Callable<String>>();  tasks.add(new ExceptionCallable());
  tasks.add(new ExceptionCallable());
  tasks.add(new ExceptionCallable());  String result = executorService.invokeAny(tasks);  System.out.println("result=" + result);  executorService.shutdown();
}
程序执行结果是:调用invokeAny()报错 java.util.concurrent.ExecutionException: java.lang.NullPointerException。

也就是说:

如果提交的任务列表中,没有1个正常完成的任务,那么调用invokeAny会抛异常,究竟抛的是哪儿个任务的异常,无关紧要

第三种情况:先提交3个异常任务,再提交1个正常的耗时任务

/**
* 有异常的任务,有正常的任务,invokeAny()不会抛异常,返回最先正常完成的任务
*/
public static void invokeAny3() throws Exception
{
  ExecutorService executorService = Executors.newFixedThreadPool(3);  List<Callable<String>> tasks = new ArrayList<Callable<String>>();  tasks.add(new ExceptionCallable());
  tasks.add(new ExceptionCallable());
  tasks.add(new ExceptionCallable());
  tasks.add(new ExceptionCallable());  tasks.add(new SleepSecondsCallable("t1", 2));  String result = executorService.invokeAny(tasks);  System.out.println("result=" + result);
  executorService.shutdown();
}

程序执行结果是:不会抛出任何异常,打印出t2任务的返回结果。也就是说:

invokeAny()和任务的提交顺序无关,只是返回最早正常执行完成的任务

第四种情况,测试下使用限时版本的invokeAny(),主要功能与不限时版本的差别不大

/*** 还没有到超时之前,所以的任务都已经异常完成,抛出ExecutionException<br>* 如果超时前满,还没有没有完成的任务,抛TimeoutException*/
public static void invokeAnyTimeout() throws Exception
{
  ExecutorService executorService = Executors.newFixedThreadPool(3);  List<Callable<String>> tasks = new ArrayList<Callable<String>>();  tasks.add(new ExceptionCallable());
  tasks.add(new ExceptionCallable());
  tasks.add(new ExceptionCallable());
  tasks.add(new ExceptionCallable());  String result = executorService.invokeAny(tasks, 2, TimeUnit.SECONDS);  System.out.println("result=" + result);  executorService.shutdown();
}
程序执行结果是:抛出ExecutionException。这个其实很合理,也很好理解。如果在超时之前,所有任务已经都是异常终止,那就没有必要在等下去了;如果超时之后,仍然有正在运行或等待运行的任务,那么会抛出TimeoutException。

最后我们来看下,JDK源码中ExecutorService.invokeAny的方法签名和注释

/**
  * Executes the given tasks, returning the result
  * of one that has completed successfully (i.e., without throwing
  * an exception), if any do. Upon normal or exceptional return,
  * tasks that have not completed are cancelled.
  * The results of this method are undefined if the given
  * collection is modified while this operation is in progress.
  *
  * @param tasks the collection of tasks
  * @return the result returned by one of the tasks
  * @throws InterruptedException if interrupted while waiting
  * @throws NullPointerException if tasks or any of its elements
  *	    are <tt>null</tt>
  * @throws IllegalArgumentException if tasks is empty
  * @throws ExecutionException if no task successfully completes
  * @throws RejectedExecutionException if tasks cannot be scheduled
  *	    for execution
  */<T> T invokeAny(Collection<? extends Callable<T>> tasks)
     throws InterruptedException, ExecutionException;

与我们测试结果一致,invokeAny()返回最先正常完成(without throwing exception)的任务直接结果;一旦有任务正常完成或者调用出现异常,线程池都会终止正在运行或等待运行(tasks that have not completed are cancelled)的任务。

2.测试invokeAll()

这个方法相对来说比较好理解,就是执行任务列表中的所有任务,并返回与每个任务对应的Futue。也就是说,任务彼此之间不会相互影响,可以通过future跟踪每一个任务的执行情况,比如是否被取消,是正常完成,还是异常完成,这主要使用Future类提供的API。

public static void testInvokeAll() throws Exception
{
  ExecutorService executorService = Executors.newFixedThreadPool(5);  List<Callable<String>> tasks = new ArrayList<Callable<String>>();
  tasks.add(new SleepSecondsCallable("t1", 2));
  tasks.add(new SleepSecondsCallable("t2", 2));
  tasks.add(new RandomTenCharsTask());
  tasks.add(new ExceptionCallable());  // 调用该方法的线程会阻塞,直到tasks全部执行完成(正常完成/异常退出)
  List<Future<String>> results = executorService.invokeAll(tasks);  // 任务列表中所有任务执行完毕,才能执行该语句
  System.out.println("wait for the result." + results.size());  executorService.shutdown();  for (Future<String> f : results)
  {
    // isCanceled=false,isDone=true
    System.out.println("isCanceled=" + f.isCancelled() + ",isDone="
        + f.isDone());    // ExceptionCallable任务会报ExecutionException
    System.out.println("task result=" + f.get());
  }
}

程序的执行结果和一些结论,已经直接写在代码注释里面了。invokeAll是一个阻塞方法,会等待任务列表中的所有任务都执行完成。不管任务是正常完成,还是异常终止,Future.isDone()始终返回true。通过

Future.isCanceled()可以判断任务是否在执行的过程中被取消。通过Future.get()可以获取任务的返回结果,或者是任务在执行中抛出的异常。

第二种情况,测试限时版本的invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)

/*** 可以通过Future.isCanceled()判断任务是被取消,还是完成(正常/异常)<br>* Future.isDone()总是返回true,对于invokeAll()的调用者来说,没有啥用*/
public static void testInvokeAllTimeout() throws Exception
{
  ExecutorService executorService = Executors.newFixedThreadPool(5);  List<Callable<String>> tasks = new ArrayList<Callable<String>>();
  tasks.add(new SleepSecondsCallable("t1", 2));
  tasks.add(new SleepSecondsCallable("t2", 2));
  tasks.add(new SleepSecondsCallable("t3", 3));
  tasks.add(new RandomTenCharsTask());  List<Future<String>> results = executorService.invokeAll(tasks, 1,
      TimeUnit.SECONDS);  System.out.println("wait for the result." + results.size());  for (Future<String> f : results)
  {
    System.out.println("isCanceled=" + f.isCancelled() + ",isDone="
        + f.isDone());
  }  executorService.shutdown();}

执行结果是:

wait for the result.4

isCanceled=true,isDone=true

isCanceled=true,isDone=true

isCanceled=true,isDone=true

isCanceled=false,isDone=true

也就是说给定的超时期满,还没有完成的任务会被取消,即Future.isCancelled()返回true;在超时期之前,无论是正常完成还是异常终止的任务, Future.is

Cancelled()返回false。

第三种情况,测试在等待invokeAll执行完成之前,线程被中断。

/*** 如果线程在等待invokeAll()执行完成的时候,被中断,会抛出InterruptedException<br>* 此时线程池会终止没有完成的任务,这主要是为了减少资源的浪费.*/
public static void testInvokeAllWhenInterrupt() throws Exception
{
  final ExecutorService executorService = Executors.newFixedThreadPool(5);  // 调用invokeAll的线程
  Thread invokeAllThread = new Thread() {    @Override
    public void run()
    {
      List<Callable<String>> tasks = new ArrayList<Callable<String>>();
      tasks.add(new SleepSecondsCallable("t1", 2));
      tasks.add(new SleepSecondsCallable("t2", 2));
      tasks.add(new RandomTenCharsTask());      // 调用线程会阻塞,直到tasks全部执行完成(正常完成/异常退出)
      try
      {
        List<Future<String>> results = executorService
            .invokeAll(tasks);
        System.out.println("wait for the result." + results.size());
      } catch (InterruptedException e)
      {
        System.out
            .println("I was wait,but my thread was interrupted.");
        e.printStackTrace();
      }    }
  };  invokeAllThread.start();  Thread.sleep(200);  invokeAllThread.interrupt();  executorService.shutdown();}

invokeAllThread 线程调用了ExecutorService.invokeAll(),在等待任务执行完成的时候,

invokeAllThread被别的线程中断了。这个时候,

ExecutorService.invokeAll()会抛出java.lang.InterruptedException,任务t1和t2都被终止抛出java.lang.InterruptedException: sleep interrupted。

也就是说一旦 ExecutorService.invokeAll()方法产生了异常,线程池中还没有完成的任务会被取消执行。

这篇关于ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1136073

相关文章

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

python管理工具之conda安装部署及使用详解

《python管理工具之conda安装部署及使用详解》这篇文章详细介绍了如何安装和使用conda来管理Python环境,它涵盖了从安装部署、镜像源配置到具体的conda使用方法,包括创建、激活、安装包... 目录pytpshheraerUhon管理工具:conda部署+使用一、安装部署1、 下载2、 安装3

Mysql虚拟列的使用场景

《Mysql虚拟列的使用场景》MySQL虚拟列是一种在查询时动态生成的特殊列,它不占用存储空间,可以提高查询效率和数据处理便利性,本文给大家介绍Mysql虚拟列的相关知识,感兴趣的朋友一起看看吧... 目录1. 介绍mysql虚拟列1.1 定义和作用1.2 虚拟列与普通列的区别2. MySQL虚拟列的类型2

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

关于@MapperScan和@ComponentScan的使用问题

《关于@MapperScan和@ComponentScan的使用问题》文章介绍了在使用`@MapperScan`和`@ComponentScan`时可能会遇到的包扫描冲突问题,并提供了解决方法,同时,... 目录@MapperScan和@ComponentScan的使用问题报错如下原因解决办法课外拓展总结@

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超

Linux使用fdisk进行磁盘的相关操作

《Linux使用fdisk进行磁盘的相关操作》fdisk命令是Linux中用于管理磁盘分区的强大文本实用程序,这篇文章主要为大家详细介绍了如何使用fdisk进行磁盘的相关操作,需要的可以了解下... 目录简介基本语法示例用法列出所有分区查看指定磁盘的区分管理指定的磁盘进入交互式模式创建一个新的分区删除一个存

C#使用HttpClient进行Post请求出现超时问题的解决及优化

《C#使用HttpClient进行Post请求出现超时问题的解决及优化》最近我的控制台程序发现有时候总是出现请求超时等问题,通常好几分钟最多只有3-4个请求,在使用apipost发现并发10个5分钟也... 目录优化结论单例HttpClient连接池耗尽和并发并发异步最终优化后优化结论我直接上优化结论吧,

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学