Java并发编程与技术内幕:Callable、Future、FutureTask、CompletionService

本文主要是介绍Java并发编程与技术内幕:Callable、Future、FutureTask、CompletionService,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

         在上一文章中,笔者介绍了线程池及其内部的原理。今天主要讲的也是和线程相关的内容。一般情况下,使用Runnable接口、Thread实现的线程我们都是无法返回结果的。但是如果对一些场合需要线程返回的结果。就要使用用Callable、Future、FutureTask、CompletionService这几个类。Callable只能在ExecutorService的线程池中跑,但有返回结果,也可以通过返回的Future对象查询执行状态。Future 本身也是一种设计模式,它是用来取得异步任务的结果,

一、基本源码

所以来看看它们的源码信息

1、Callable

看看其源码:

 
  1. public interface Callable<V> {

  2. V call() throws Exception;

  3. }

它只有一个call方法,并且有一个返回V,是泛型。可以认为这里返回V就是线程返回的结果。

ExecutorService接口:线程池执行调度框架

 

 
  1. <T> Future<T> submit(Callable<T> task);

  2. <T> Future<T> submit(Runnable task, T result);

  3. Future<?> submit(Runnable task);

 

2、Future

Future是我们最常见的

 

 
  1. public interface Future<V> {

  2. //试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动 //,则此任务将永不运行。如果任务已经启动,则

  3. //mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返 //回 true,则对 isCancelled()

  4. //的后续调用将始终返回 true。

  5. boolean cancel(boolean mayInterruptIfRunning);

  6.  
  7. //如果在任务正常完成前将其取消,则返回 true。

  8. boolean isCancelled();

  9.  
  10. //如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。

  11. boolean isDone();

  12.  
  13. //等待线程结果返回,会阻塞

  14. V get() throws InterruptedException, ExecutionException;

  15.  
  16. //设置超时时间

  17. V get(long timeout, TimeUnit unit)

  18. throws InterruptedException, ExecutionException, TimeoutException;

  19. }

 

 

3、FutureTask

 

 

从源码看其继承关系如下:

 

其源码如下:

 

 
  1. public class FutureTask<V> implements RunnableFuture<V> {

  2. //真正用来执行线程的类

  3. private final Sync sync;

  4.  
  5. //构造方法1,从Callable来创建FutureTask

  6. public FutureTask(Callable<V> callable) {

  7. if (callable == null)

  8. throw new NullPointerException();

  9. sync = new Sync(callable);

  10. }

  11.  
  12. //构造方法2,从Runnable来创建FutureTask,V就是线程执行返回结果

  13. public FutureTask(Runnable runnable, V result) {

  14. sync = new Sync(Executors.callable(runnable, result));

  15. }

  16. //和Futrue一样

  17. public boolean isCancelled() {

  18. return sync.innerIsCancelled();

  19. }

  20. //和Futrue一样

  21. public boolean isDone() {

  22. return sync.innerIsDone();

  23. }

  24. //和Futrue一样

  25. public boolean cancel(boolean mayInterruptIfRunning) {

  26. return sync.innerCancel(mayInterruptIfRunning);

  27. }

  28.  
  29. //和Futrue一样

  30. public V get() throws InterruptedException, ExecutionException {

  31. return sync.innerGet();

  32. }

  33.  
  34. //和Futrue一样

  35. public V get(long timeout, TimeUnit unit)

  36. throws InterruptedException, ExecutionException, TimeoutException {

  37. return sync.innerGet(unit.toNanos(timeout));

  38. }

  39.  
  40. //线程结束后的操作

  41. protected void done() { }

  42.  
  43. //设置结果

  44. protected void set(V v) {

  45. sync.innerSet(v);

  46. }

  47.  
  48. //设置异常

  49. protected void setException(Throwable t) {

  50. sync.innerSetException(t);

  51. }

  52. //线程执行入口

  53. public void run() {

  54. sync.innerRun();

  55. }

  56.  
  57. //重置

  58. protected boolean runAndReset() {

  59. return sync.innerRunAndReset();

  60. }

  61.  
  62. //这个类才是真正执行、关闭线程的类

  63. private final class Sync extends AbstractQueuedSynchronizer {

  64. private static final long serialVersionUID = -7828117401763700385L;

  65. //线程运行状态

  66. private static final int RUNNING = 1;

  67. private static final int RAN = 2;

  68. private static final int CANCELLED = 4;

  69.  
  70.  
  71. private final Callable<V> callable;

  72. private V result;

  73. private Throwable exception;

  74.  
  75. //线程实例

  76. private volatile Thread runner;

  77. //构造函数

  78. Sync(Callable<V> callable) {

  79. this.callable = callable;

  80. }

  81.  
  82. 。。。。

  83. }

  84. }

FutureTask类是Future 的一个实现,并实现了Runnable,所以可通过Excutor(线程池) 来执行,也可传递给Thread对象执行。如果在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。 Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。FutureTask类既可以使用new Thread(Runnable r)放到一个新线程中跑,也可以使用ExecutorService.submit(Runnable r)放到线程池中跑,而且两种方式都可以获取返回结果,但实质是一样的,即如果要有返回结果那么构造函数一定要注入一个Callable对象。

 

 

二、应用实例

1、Future实例

 

 
  1. package com.func.axc.futuretask;

  2.  
  3. import java.util.Random;

  4. import java.util.concurrent.Callable;

  5. import java.util.concurrent.ExecutionException;

  6. import java.util.concurrent.ExecutorService;

  7. import java.util.concurrent.Executors;

  8. import java.util.concurrent.Future;

  9.  
  10. /**

  11. * 功能概要:

  12. *

  13. * @author linbingwen

  14. * @since 2016年6月8日

  15. */

  16. public class FutureTest {

  17.  
  18. /**

  19. * @author linbingwen

  20. * @since 2016年6月8日

  21. * @param args

  22. */

  23. public static void main(String[] args) {

  24. System.out.println("main Thread begin at:"+ System.nanoTime());

  25. ExecutorService executor = Executors.newCachedThreadPool();

  26. HandleCallable task1 = new HandleCallable("1");

  27. HandleCallable task2 = new HandleCallable("2");

  28. HandleCallable task3 = new HandleCallable("3");

  29. Future<Integer> result1 = executor.submit(task1);

  30. Future<Integer> result2 = executor.submit(task2);

  31. Future<Integer> result3 = executor.submit(task3);

  32. executor.shutdown();

  33. try {

  34. Thread.sleep(1000);

  35. } catch (InterruptedException e1) {

  36. e1.printStackTrace();

  37. }

  38. try {

  39. System.out.println("task1运行结果:"+result1.get());

  40. System.out.println("task2运行结果:"+result2.get());

  41. System.out.println("task3运行结果:"+result3.get());

  42. } catch (InterruptedException e) {

  43. e.printStackTrace();

  44. } catch (ExecutionException e) {

  45. e.printStackTrace();

  46. }

  47. System.out.println("main Thread finish at:"+ System.nanoTime());

  48. }

  49.  
  50. }

  51.  
  52. class HandleCallable implements Callable<Integer>{

  53. private String name;

  54. public HandleCallable(String name) {

  55. this.name = name;

  56. }

  57.  
  58. @Override

  59. public Integer call() throws Exception {

  60. System.out.println("task"+ name + "开始进行计算");

  61. Thread.sleep(3000);

  62. int sum = new Random().nextInt(300);

  63. int result = 0;

  64. for (int i = 0; i < sum; i++)

  65. result += i;

  66. return result;

  67. }

  68. }

执行结果:

 

 

 

 

 

2、FutureTask

方法一、直接通过New Thread来启动线程

 

 
  1. package com.func.axc.futuretask;

  2.  
  3. import java.util.Random;

  4. import java.util.concurrent.Callable;

  5. import java.util.concurrent.ExecutionException;

  6. import java.util.concurrent.FutureTask;

  7.  
  8. import org.springframework.scheduling.config.Task;

  9.  
  10. /**

  11. * 功能概要:

  12. *

  13. * @author linbingwen

  14. * @since 2016年5月31日

  15. */

  16. public class FutrueTaskTest {

  17.  
  18. public static void main(String[] args) {

  19. //采用直接启动线程的方法

  20. System.out.println("main Thread begin at:"+ System.nanoTime());

  21. MyTask task1 = new MyTask("1");

  22. FutureTask<Integer> result1 = new FutureTask<Integer>(task1);

  23. Thread thread1 = new Thread(result1);

  24. thread1.start();

  25.  
  26. MyTask task2 = new MyTask("2");

  27. FutureTask<Integer> result2 = new FutureTask<Integer>(task2);

  28. Thread thread2 = new Thread(result2);

  29. thread2.start();

  30.  
  31. try {

  32. Thread.sleep(1000);

  33. } catch (InterruptedException e1) {

  34. e1.printStackTrace();

  35. }

  36.  
  37. try {

  38. System.out.println("task1返回结果:" + result1.get());

  39. System.out.println("task2返回结果:" + result2.get());

  40. } catch (InterruptedException e) {

  41. e.printStackTrace();

  42. } catch (ExecutionException e) {

  43. e.printStackTrace();

  44. }

  45.  
  46. System.out.println("main Thread finish at:"+ System.nanoTime());

  47.  
  48. }

  49. }

  50.  
  51. class MyTask implements Callable<Integer> {

  52. private String name;

  53.  
  54. public MyTask(String name) {

  55. this.name = name;

  56. }

  57.  
  58. @Override

  59. public Integer call() throws Exception {

  60. System.out.println("task"+ name + "开始进行计算");

  61. Thread.sleep(3000);

  62. int sum = new Random().nextInt(300);

  63. int result = 0;

  64. for (int i = 0; i < sum; i++)

  65. result += i;

  66. return result;

  67. }

  68.  
  69. }


执行结果:

 

 


方法二、通过线程池来启动线程

 

 
  1. package com.func.axc.futuretask;

  2.  
  3. import java.util.Random;

  4. import java.util.concurrent.Callable;

  5. import java.util.concurrent.ExecutionException;

  6. import java.util.concurrent.ExecutorService;

  7. import java.util.concurrent.Executors;

  8. import java.util.concurrent.Future;

  9.  
  10. /**

  11. * 功能概要:

  12. *

  13. * @author linbingwen

  14. * @since 2016年5月31日

  15. */

  16. public class FutrueTaskTest2 {

  17.  
  18. public static void main(String[] args) {

  19. System.out.println("main Thread begin at:"+ System.nanoTime());

  20. ExecutorService executor = Executors.newCachedThreadPool();

  21. MyTask2 task1 = new MyTask2("1");

  22. MyTask2 task2 = new MyTask2("2");

  23. Future<Integer> result1 = executor.submit(task1);

  24. Future<Integer> result2 = executor.submit(task2);

  25. executor.shutdown();

  26.  
  27. try {

  28. Thread.sleep(1000);

  29. } catch (InterruptedException e1) {

  30. e1.printStackTrace();

  31. }

  32.  
  33. try {

  34. System.out.println("task1返回结果:" + result1.get());

  35. System.out.println("task2返回结果:" + result2.get());

  36. } catch (InterruptedException e) {

  37. e.printStackTrace();

  38. } catch (ExecutionException e) {

  39. e.printStackTrace();

  40. }

  41.  
  42. System.out.println("main Thread finish at:"+ System.nanoTime());

  43.  
  44. }

  45. }

  46.  
  47. class MyTask2 implements Callable<Integer> {

  48. private String name;

  49.  
  50. public MyTask2(String name) {

  51. this.name = name;

  52. }

  53.  
  54. @Override

  55. public Integer call() throws Exception {

  56. System.out.println("task"+ name + "开始进行计算");

  57. Thread.sleep(3000);

  58. int sum = new Random().nextInt(300);

  59. int result = 0;

  60. for (int i = 0; i < sum; i++)

  61. result += i;

  62. return result;

  63. }

  64.  
  65. }

执行结果:

 

 

 

三、CompletionService

 

这个光看其单词,就可以猜到它应该是一个线程执行完成后相关的服务,没错。它就是一个将线程池执行结果放入到一个Blockqueueing的类。那么它和Future或FutureTask有什么不同呢?其实在上面的例子中,笔者用的实例可能不太好。如果在线程池中我们使用Future或FutureTask来取得返回结果,比如。我们开了100条线程。但是这些线程的执行时间是未知的。但是我们又需要返回结果。每执行一条线程就根据结果做一次相应的操作。如果是Future或FutureTask。我们只能通过一个循环,不断的遍历线程池里的线程。取得其执行状态。然后再来取结果。这样效率就太低了,有可能发生一条线程执行完毕了,但我们不能立刻知道它处理完成了。还得通过一个循环来判断。基本上面的这种问题,所以出了CompletionService。

     CompletionService原理不是很难,它就是将一组线程的执行结果放入一个BlockQueueing当中。这里线程的执行结果放入到Blockqueue的顺序只和这个线程的执行时间有关。和它们的启动顺序无关。并且你无需自己在去写很多判断哪个线程是否执行完成,它里面会去帮你处理。

首先看看其源码:

 

 
  1. package java.util.concurrent;

  2.  
  3. public interface CompletionService<V> {

  4. //提交线程任务

  5. Future<V> submit(Callable<V> task);

  6. //提交线程任务

  7. Future<V> submit(Runnable task, V result);

  8. //阻塞等待

  9. Future<V> take() throws InterruptedException;

  10. //非阻塞等待

  11. Future<V> poll();

  12. //带时间的非阻塞等待

  13. Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

  14. }

上面只是一个接口类,其实现类如下:

 

 

 
  1. package java.util.concurrent;

  2.  
  3. public class ExecutorCompletionService<V> implements CompletionService<V> {

  4. private final Executor executor;//线程池类

  5. private final AbstractExecutorService aes;

  6. private final BlockingQueue<Future<V>> completionQueue;//存放线程执行结果的阻塞队列

  7.  
  8. //内部封装的一个用来执线程的FutureTask

  9. private class QueueingFuture extends FutureTask<Void> {

  10. QueueingFuture(RunnableFuture<V> task) {

  11. super(task, null);

  12. this.task = task;

  13. }

  14. protected void done() { completionQueue.add(task); }//线程执行完成后调用此函数将结果放入阻塞队列

  15. private final Future<V> task;

  16. }

  17.  
  18. private RunnableFuture<V> newTaskFor(Callable<V> task) {

  19. if (aes == null)

  20. return new FutureTask<V>(task);

  21. else

  22. return aes.newTaskFor(task);

  23. }

  24.  
  25. private RunnableFuture<V> newTaskFor(Runnable task, V result) {

  26. if (aes == null)

  27. return new FutureTask<V>(task, result);

  28. else

  29. return aes.newTaskFor(task, result);

  30. }

  31.  
  32. //构造函数,这里一般传入一个线程池对象executor的实现类

  33. public ExecutorCompletionService(Executor executor) {

  34. if (executor == null)

  35. throw new NullPointerException();

  36. this.executor = executor;

  37. this.aes = (executor instanceof AbstractExecutorService) ?

  38. (AbstractExecutorService) executor : null;

  39. this.completionQueue = new LinkedBlockingQueue<Future<V>>();//默认的是链表阻塞队列

  40. }

  41.  
  42. //构造函数,可以自己设定阻塞队列

  43. public ExecutorCompletionService(Executor executor,

  44. BlockingQueue<Future<V>> completionQueue) {

  45. if (executor == null || completionQueue == null)

  46. throw new NullPointerException();

  47. this.executor = executor;

  48. this.aes = (executor instanceof AbstractExecutorService) ?

  49. (AbstractExecutorService) executor : null;

  50. this.completionQueue = completionQueue;

  51. }

  52. //提交线程任务,其实最终还是executor去提交

  53. public Future<V> submit(Callable<V> task) {

  54. if (task == null) throw new NullPointerException();

  55. RunnableFuture<V> f = newTaskFor(task);

  56. executor.execute(new QueueingFuture(f));

  57. return f;

  58. }

  59. //提交线程任务,其实最终还是executor去提交

  60. public Future<V> submit(Runnable task, V result) {

  61. if (task == null) throw new NullPointerException();

  62. RunnableFuture<V> f = newTaskFor(task, result);

  63. executor.execute(new QueueingFuture(f));

  64. return f;

  65. }

  66.  
  67. public Future<V> take() throws InterruptedException {

  68. return completionQueue.take();

  69. }

  70.  
  71. public Future<V> poll() {

  72. return completionQueue.poll();

  73. }

  74.  
  75. public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {

  76. return completionQueue.poll(timeout, unit);

  77. }

  78.  
  79. }

从源码中可以知道。最终还是线程还是提交到Executor当中去运行,所以构造函数中需要Executor参数来实例化。而每次有线程执行完成后往阻塞队列添加一个Future。

 

这是上面的RunnableFuture,这是每次往线程池是放入的线程。

 

 
  1. public interface RunnableFuture<V> extends Runnable, Future<V> {

  2. void run();

  3. }


接下来以两个例子来说明其使用

 

1、与Future的区别使用:

自定义一个Callable

 

 
  1. class HandleFuture<Integer> implements Callable<Integer> {

  2.  
  3. private Integer num;

  4.  
  5. public HandleFuture(Integer num) {

  6. this.num = num;

  7. }

  8.  
  9. @Override

  10. public Integer call() throws Exception {

  11. Thread.sleep(3*100);

  12. System.out.println(Thread.currentThread().getName());

  13. return num;

  14. }

  15.  
  16. }

首先是Futuer

 

 

 
  1. public static void FutureTest() throws InterruptedException, ExecutionException {

  2. System.out.println("main Thread begin:");

  3. ExecutorService executor = Executors.newCachedThreadPool();

  4. List<Future<Integer>> result = new ArrayList<Future<Integer>>();

  5. for (int i = 0;i<10;i++) {

  6. Future<Integer> submit = executor.submit(new HandleFuture(i));

  7. result.add(submit);

  8. }

  9. executor.shutdown();

  10. for (int i = 0;i<10;i++) {//一个一个等待返回结果

  11. System.out.println("返回结果:"+result.get(i).get());

  12. }

  13. System.out.println("main Thread end:");

  14. }

执行结果:

 

 


从输出结果可以看出,我们只能一个一个阻塞的取出。这中间肯定会浪费一定的时间在等待上。如7返回了。但是前面1-6都没有返回。那么7就得等1-6输出才能输出。

 

接下来换成CompletionService来做:

 

 
  1. public static void CompleTest() throws InterruptedException, ExecutionException {

  2. System.out.println("main Thread begin:");

  3. ExecutorService executor = Executors.newCachedThreadPool();

  4. // 构建完成服务

  5. CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);

  6. for (int i = 0;i<10;i++) {

  7. completionService.submit(new HandleFuture(i));

  8. }

  9. for (int i = 0;i<10;i++) {//一个一个等待返回结果

  10. System.out.println("返回结果:"+completionService.take().get());

  11. }

  12. System.out.println("main Thread end:");

  13. }


输出结果:

 

 

可以看出,结果的输出和线程的放入顺序无关系。每一个线程执行成功后,立刻就输出。

这篇关于Java并发编程与技术内幕:Callable、Future、FutureTask、CompletionService的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/755239

相关文章

springboot健康检查监控全过程

《springboot健康检查监控全过程》文章介绍了SpringBoot如何使用Actuator和Micrometer进行健康检查和监控,通过配置和自定义健康指示器,开发者可以实时监控应用组件的状态,... 目录1. 引言重要性2. 配置Spring Boot ActuatorSpring Boot Act

使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)

《使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)》在现代软件开发中,处理JSON数据是一项非常常见的任务,无论是从API接口获取数据,还是将数据存储为JSON格式,解析... 目录1. 背景介绍1.1 jsON简介1.2 实际案例2. 准备工作2.1 环境搭建2.1.1 添加

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五

java如何分布式锁实现和选型

《java如何分布式锁实现和选型》文章介绍了分布式锁的重要性以及在分布式系统中常见的问题和需求,它详细阐述了如何使用分布式锁来确保数据的一致性和系统的高可用性,文章还提供了基于数据库、Redis和Zo... 目录引言:分布式锁的重要性与分布式系统中的常见问题和需求分布式锁的重要性分布式系统中常见的问题和需求

SpringBoot基于MyBatis-Plus实现Lambda Query查询的示例代码

《SpringBoot基于MyBatis-Plus实现LambdaQuery查询的示例代码》MyBatis-Plus是MyBatis的增强工具,简化了数据库操作,并提高了开发效率,它提供了多种查询方... 目录引言基础环境配置依赖配置(Maven)application.yml 配置表结构设计demo_st

在Ubuntu上部署SpringBoot应用的操作步骤

《在Ubuntu上部署SpringBoot应用的操作步骤》随着云计算和容器化技术的普及,Linux服务器已成为部署Web应用程序的主流平台之一,Java作为一种跨平台的编程语言,具有广泛的应用场景,本... 目录一、部署准备二、安装 Java 环境1. 安装 JDK2. 验证 Java 安装三、安装 mys

Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单

《Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单》:本文主要介绍Springboot的ThreadPoolTaskScheduler线... 目录ThreadPoolTaskScheduler线程池实现15分钟不操作自动取消订单概要1,创建订单后

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Java调用Python代码的几种方法小结

《Java调用Python代码的几种方法小结》Python语言有丰富的系统管理、数据处理、统计类软件包,因此从java应用中调用Python代码的需求很常见、实用,本文介绍几种方法从java调用Pyt... 目录引言Java core使用ProcessBuilder使用Java脚本引擎总结引言python