本文主要是介绍Java并发编程与技术内幕:Callable、Future、FutureTask、CompletionService,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在上一文章中,笔者介绍了线程池及其内部的原理。今天主要讲的也是和线程相关的内容。一般情况下,使用Runnable接口、Thread实现的线程我们都是无法返回结果的。但是如果对一些场合需要线程返回的结果。就要使用用Callable、Future、FutureTask、CompletionService这几个类。Callable只能在ExecutorService的线程池中跑,但有返回结果,也可以通过返回的Future对象查询执行状态。Future 本身也是一种设计模式,它是用来取得异步任务的结果,
一、基本源码
所以来看看它们的源码信息
1、Callable
看看其源码:
-
public interface Callable<V> {
-
V call() throws Exception;
-
}
它只有一个call方法,并且有一个返回V,是泛型。可以认为这里返回V就是线程返回的结果。
ExecutorService接口:线程池执行调度框架
-
<T> Future<T> submit(Callable<T> task);
-
<T> Future<T> submit(Runnable task, T result);
-
Future<?> submit(Runnable task);
2、Future
Future是我们最常见的
-
public interface Future<V> {
-
//试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动 //,则此任务将永不运行。如果任务已经启动,则
-
//mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返 //回 true,则对 isCancelled()
-
//的后续调用将始终返回 true。
-
boolean cancel(boolean mayInterruptIfRunning);
-
//如果在任务正常完成前将其取消,则返回 true。
-
boolean isCancelled();
-
//如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
-
boolean isDone();
-
//等待线程结果返回,会阻塞
-
V get() throws InterruptedException, ExecutionException;
-
//设置超时时间
-
V get(long timeout, TimeUnit unit)
-
throws InterruptedException, ExecutionException, TimeoutException;
-
}
3、FutureTask
从源码看其继承关系如下:
其源码如下:
-
public class FutureTask<V> implements RunnableFuture<V> {
-
//真正用来执行线程的类
-
private final Sync sync;
-
//构造方法1,从Callable来创建FutureTask
-
public FutureTask(Callable<V> callable) {
-
if (callable == null)
-
throw new NullPointerException();
-
sync = new Sync(callable);
-
}
-
//构造方法2,从Runnable来创建FutureTask,V就是线程执行返回结果
-
public FutureTask(Runnable runnable, V result) {
-
sync = new Sync(Executors.callable(runnable, result));
-
}
-
//和Futrue一样
-
public boolean isCancelled() {
-
return sync.innerIsCancelled();
-
}
-
//和Futrue一样
-
public boolean isDone() {
-
return sync.innerIsDone();
-
}
-
//和Futrue一样
-
public boolean cancel(boolean mayInterruptIfRunning) {
-
return sync.innerCancel(mayInterruptIfRunning);
-
}
-
//和Futrue一样
-
public V get() throws InterruptedException, ExecutionException {
-
return sync.innerGet();
-
}
-
//和Futrue一样
-
public V get(long timeout, TimeUnit unit)
-
throws InterruptedException, ExecutionException, TimeoutException {
-
return sync.innerGet(unit.toNanos(timeout));
-
}
-
//线程结束后的操作
-
protected void done() { }
-
//设置结果
-
protected void set(V v) {
-
sync.innerSet(v);
-
}
-
//设置异常
-
protected void setException(Throwable t) {
-
sync.innerSetException(t);
-
}
-
//线程执行入口
-
public void run() {
-
sync.innerRun();
-
}
-
//重置
-
protected boolean runAndReset() {
-
return sync.innerRunAndReset();
-
}
-
//这个类才是真正执行、关闭线程的类
-
private final class Sync extends AbstractQueuedSynchronizer {
-
private static final long serialVersionUID = -7828117401763700385L;
-
//线程运行状态
-
private static final int RUNNING = 1;
-
private static final int RAN = 2;
-
private static final int CANCELLED = 4;
-
private final Callable<V> callable;
-
private V result;
-
private Throwable exception;
-
//线程实例
-
private volatile Thread runner;
-
//构造函数
-
Sync(Callable<V> callable) {
-
this.callable = callable;
-
}
-
。。。。
-
}
-
}
FutureTask类是Future 的一个实现,并实现了Runnable,所以可通过Excutor(线程池) 来执行,也可传递给Thread对象执行。如果在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。 Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。FutureTask类既可以使用new Thread(Runnable r)放到一个新线程中跑,也可以使用ExecutorService.submit(Runnable r)放到线程池中跑,而且两种方式都可以获取返回结果,但实质是一样的,即如果要有返回结果那么构造函数一定要注入一个Callable对象。
二、应用实例
1、Future实例
-
package com.func.axc.futuretask;
-
import java.util.Random;
-
import java.util.concurrent.Callable;
-
import java.util.concurrent.ExecutionException;
-
import java.util.concurrent.ExecutorService;
-
import java.util.concurrent.Executors;
-
import java.util.concurrent.Future;
-
/**
-
* 功能概要:
-
*
-
* @author linbingwen
-
* @since 2016年6月8日
-
*/
-
public class FutureTest {
-
/**
-
* @author linbingwen
-
* @since 2016年6月8日
-
* @param args
-
*/
-
public static void main(String[] args) {
-
System.out.println("main Thread begin at:"+ System.nanoTime());
-
ExecutorService executor = Executors.newCachedThreadPool();
-
HandleCallable task1 = new HandleCallable("1");
-
HandleCallable task2 = new HandleCallable("2");
-
HandleCallable task3 = new HandleCallable("3");
-
Future<Integer> result1 = executor.submit(task1);
-
Future<Integer> result2 = executor.submit(task2);
-
Future<Integer> result3 = executor.submit(task3);
-
executor.shutdown();
-
try {
-
Thread.sleep(1000);
-
} catch (InterruptedException e1) {
-
e1.printStackTrace();
-
}
-
try {
-
System.out.println("task1运行结果:"+result1.get());
-
System.out.println("task2运行结果:"+result2.get());
-
System.out.println("task3运行结果:"+result3.get());
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ExecutionException e) {
-
e.printStackTrace();
-
}
-
System.out.println("main Thread finish at:"+ System.nanoTime());
-
}
-
}
-
class HandleCallable implements Callable<Integer>{
-
private String name;
-
public HandleCallable(String name) {
-
this.name = name;
-
}
-
@Override
-
public Integer call() throws Exception {
-
System.out.println("task"+ name + "开始进行计算");
-
Thread.sleep(3000);
-
int sum = new Random().nextInt(300);
-
int result = 0;
-
for (int i = 0; i < sum; i++)
-
result += i;
-
return result;
-
}
-
}
执行结果:
2、FutureTask
方法一、直接通过New Thread来启动线程
-
package com.func.axc.futuretask;
-
import java.util.Random;
-
import java.util.concurrent.Callable;
-
import java.util.concurrent.ExecutionException;
-
import java.util.concurrent.FutureTask;
-
import org.springframework.scheduling.config.Task;
-
/**
-
* 功能概要:
-
*
-
* @author linbingwen
-
* @since 2016年5月31日
-
*/
-
public class FutrueTaskTest {
-
public static void main(String[] args) {
-
//采用直接启动线程的方法
-
System.out.println("main Thread begin at:"+ System.nanoTime());
-
MyTask task1 = new MyTask("1");
-
FutureTask<Integer> result1 = new FutureTask<Integer>(task1);
-
Thread thread1 = new Thread(result1);
-
thread1.start();
-
MyTask task2 = new MyTask("2");
-
FutureTask<Integer> result2 = new FutureTask<Integer>(task2);
-
Thread thread2 = new Thread(result2);
-
thread2.start();
-
try {
-
Thread.sleep(1000);
-
} catch (InterruptedException e1) {
-
e1.printStackTrace();
-
}
-
try {
-
System.out.println("task1返回结果:" + result1.get());
-
System.out.println("task2返回结果:" + result2.get());
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ExecutionException e) {
-
e.printStackTrace();
-
}
-
System.out.println("main Thread finish at:"+ System.nanoTime());
-
}
-
}
-
class MyTask implements Callable<Integer> {
-
private String name;
-
public MyTask(String name) {
-
this.name = name;
-
}
-
@Override
-
public Integer call() throws Exception {
-
System.out.println("task"+ name + "开始进行计算");
-
Thread.sleep(3000);
-
int sum = new Random().nextInt(300);
-
int result = 0;
-
for (int i = 0; i < sum; i++)
-
result += i;
-
return result;
-
}
-
}
执行结果:
方法二、通过线程池来启动线程
-
package com.func.axc.futuretask;
-
import java.util.Random;
-
import java.util.concurrent.Callable;
-
import java.util.concurrent.ExecutionException;
-
import java.util.concurrent.ExecutorService;
-
import java.util.concurrent.Executors;
-
import java.util.concurrent.Future;
-
/**
-
* 功能概要:
-
*
-
* @author linbingwen
-
* @since 2016年5月31日
-
*/
-
public class FutrueTaskTest2 {
-
public static void main(String[] args) {
-
System.out.println("main Thread begin at:"+ System.nanoTime());
-
ExecutorService executor = Executors.newCachedThreadPool();
-
MyTask2 task1 = new MyTask2("1");
-
MyTask2 task2 = new MyTask2("2");
-
Future<Integer> result1 = executor.submit(task1);
-
Future<Integer> result2 = executor.submit(task2);
-
executor.shutdown();
-
try {
-
Thread.sleep(1000);
-
} catch (InterruptedException e1) {
-
e1.printStackTrace();
-
}
-
try {
-
System.out.println("task1返回结果:" + result1.get());
-
System.out.println("task2返回结果:" + result2.get());
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ExecutionException e) {
-
e.printStackTrace();
-
}
-
System.out.println("main Thread finish at:"+ System.nanoTime());
-
}
-
}
-
class MyTask2 implements Callable<Integer> {
-
private String name;
-
public MyTask2(String name) {
-
this.name = name;
-
}
-
@Override
-
public Integer call() throws Exception {
-
System.out.println("task"+ name + "开始进行计算");
-
Thread.sleep(3000);
-
int sum = new Random().nextInt(300);
-
int result = 0;
-
for (int i = 0; i < sum; i++)
-
result += i;
-
return result;
-
}
-
}
执行结果:
三、CompletionService
这个光看其单词,就可以猜到它应该是一个线程执行完成后相关的服务,没错。它就是一个将线程池执行结果放入到一个Blockqueueing的类。那么它和Future或FutureTask有什么不同呢?其实在上面的例子中,笔者用的实例可能不太好。如果在线程池中我们使用Future或FutureTask来取得返回结果,比如。我们开了100条线程。但是这些线程的执行时间是未知的。但是我们又需要返回结果。每执行一条线程就根据结果做一次相应的操作。如果是Future或FutureTask。我们只能通过一个循环,不断的遍历线程池里的线程。取得其执行状态。然后再来取结果。这样效率就太低了,有可能发生一条线程执行完毕了,但我们不能立刻知道它处理完成了。还得通过一个循环来判断。基本上面的这种问题,所以出了CompletionService。
CompletionService原理不是很难,它就是将一组线程的执行结果放入一个BlockQueueing当中。这里线程的执行结果放入到Blockqueue的顺序只和这个线程的执行时间有关。和它们的启动顺序无关。并且你无需自己在去写很多判断哪个线程是否执行完成,它里面会去帮你处理。
首先看看其源码:
-
package java.util.concurrent;
-
public interface CompletionService<V> {
-
//提交线程任务
-
Future<V> submit(Callable<V> task);
-
//提交线程任务
-
Future<V> submit(Runnable task, V result);
-
//阻塞等待
-
Future<V> take() throws InterruptedException;
-
//非阻塞等待
-
Future<V> poll();
-
//带时间的非阻塞等待
-
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
-
}
上面只是一个接口类,其实现类如下:
-
package java.util.concurrent;
-
public class ExecutorCompletionService<V> implements CompletionService<V> {
-
private final Executor executor;//线程池类
-
private final AbstractExecutorService aes;
-
private final BlockingQueue<Future<V>> completionQueue;//存放线程执行结果的阻塞队列
-
//内部封装的一个用来执线程的FutureTask
-
private class QueueingFuture extends FutureTask<Void> {
-
QueueingFuture(RunnableFuture<V> task) {
-
super(task, null);
-
this.task = task;
-
}
-
protected void done() { completionQueue.add(task); }//线程执行完成后调用此函数将结果放入阻塞队列
-
private final Future<V> task;
-
}
-
private RunnableFuture<V> newTaskFor(Callable<V> task) {
-
if (aes == null)
-
return new FutureTask<V>(task);
-
else
-
return aes.newTaskFor(task);
-
}
-
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
-
if (aes == null)
-
return new FutureTask<V>(task, result);
-
else
-
return aes.newTaskFor(task, result);
-
}
-
//构造函数,这里一般传入一个线程池对象executor的实现类
-
public ExecutorCompletionService(Executor executor) {
-
if (executor == null)
-
throw new NullPointerException();
-
this.executor = executor;
-
this.aes = (executor instanceof AbstractExecutorService) ?
-
(AbstractExecutorService) executor : null;
-
this.completionQueue = new LinkedBlockingQueue<Future<V>>();//默认的是链表阻塞队列
-
}
-
//构造函数,可以自己设定阻塞队列
-
public ExecutorCompletionService(Executor executor,
-
BlockingQueue<Future<V>> completionQueue) {
-
if (executor == null || completionQueue == null)
-
throw new NullPointerException();
-
this.executor = executor;
-
this.aes = (executor instanceof AbstractExecutorService) ?
-
(AbstractExecutorService) executor : null;
-
this.completionQueue = completionQueue;
-
}
-
//提交线程任务,其实最终还是executor去提交
-
public Future<V> submit(Callable<V> task) {
-
if (task == null) throw new NullPointerException();
-
RunnableFuture<V> f = newTaskFor(task);
-
executor.execute(new QueueingFuture(f));
-
return f;
-
}
-
//提交线程任务,其实最终还是executor去提交
-
public Future<V> submit(Runnable task, V result) {
-
if (task == null) throw new NullPointerException();
-
RunnableFuture<V> f = newTaskFor(task, result);
-
executor.execute(new QueueingFuture(f));
-
return f;
-
}
-
public Future<V> take() throws InterruptedException {
-
return completionQueue.take();
-
}
-
public Future<V> poll() {
-
return completionQueue.poll();
-
}
-
public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
-
return completionQueue.poll(timeout, unit);
-
}
-
}
从源码中可以知道。最终还是线程还是提交到Executor当中去运行,所以构造函数中需要Executor参数来实例化。而每次有线程执行完成后往阻塞队列添加一个Future。
这是上面的RunnableFuture,这是每次往线程池是放入的线程。
-
public interface RunnableFuture<V> extends Runnable, Future<V> {
-
void run();
-
}
接下来以两个例子来说明其使用
1、与Future的区别使用:
自定义一个Callable
-
class HandleFuture<Integer> implements Callable<Integer> {
-
private Integer num;
-
public HandleFuture(Integer num) {
-
this.num = num;
-
}
-
@Override
-
public Integer call() throws Exception {
-
Thread.sleep(3*100);
-
System.out.println(Thread.currentThread().getName());
-
return num;
-
}
-
}
首先是Futuer
-
public static void FutureTest() throws InterruptedException, ExecutionException {
-
System.out.println("main Thread begin:");
-
ExecutorService executor = Executors.newCachedThreadPool();
-
List<Future<Integer>> result = new ArrayList<Future<Integer>>();
-
for (int i = 0;i<10;i++) {
-
Future<Integer> submit = executor.submit(new HandleFuture(i));
-
result.add(submit);
-
}
-
executor.shutdown();
-
for (int i = 0;i<10;i++) {//一个一个等待返回结果
-
System.out.println("返回结果:"+result.get(i).get());
-
}
-
System.out.println("main Thread end:");
-
}
执行结果:
从输出结果可以看出,我们只能一个一个阻塞的取出。这中间肯定会浪费一定的时间在等待上。如7返回了。但是前面1-6都没有返回。那么7就得等1-6输出才能输出。
接下来换成CompletionService来做:
-
public static void CompleTest() throws InterruptedException, ExecutionException {
-
System.out.println("main Thread begin:");
-
ExecutorService executor = Executors.newCachedThreadPool();
-
// 构建完成服务
-
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
-
for (int i = 0;i<10;i++) {
-
completionService.submit(new HandleFuture(i));
-
}
-
for (int i = 0;i<10;i++) {//一个一个等待返回结果
-
System.out.println("返回结果:"+completionService.take().get());
-
}
-
System.out.println("main Thread end:");
-
}
输出结果:
可以看出,结果的输出和线程的放入顺序无关系。每一个线程执行成功后,立刻就输出。
这篇关于Java并发编程与技术内幕:Callable、Future、FutureTask、CompletionService的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!