Callable、Future和FutureTask原理解析

2024-01-21 16:48

本文主要是介绍Callable、Future和FutureTask原理解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

返回结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,它不能返回一个值或抛出一个受检查的异常。Runnable接口:

public interface Runnable {public abstract void run();
}
  • 1
  • 2
  • 3

由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

许多任务实际上都是存在延迟的计算,对于这些任务,Callable是一种更好的抽象:它会返回一个值,并可能抛出一个异常。Callable接口:

public interface Callable<V> {/*** Computes a result, or throws an exception if unable to do so.** @return computed result* @throws Exception if unable to compute a result*/V call() throws Exception;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

Runnable和Callable描述的都是抽象的计算任务。这些任务通常是有生命周期的。Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长时间,因此通常希望可以取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,对于已经开始执行的任务,只有当它们响应中断时才能取消。

Future表示一个任务的生命周期,并提供了方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future接口:

public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDone();V get() throws InterruptedException, ExecutionException;V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

  1. cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

  2. isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  3. isDone方法表示任务是否已经完成,若任务完成,则返回true;

  4. get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

  5. get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

也就是说实际上Future提供了三种功能:

  1. 判断任务是否完成;
  2. 中断任务;
  3. 获取任务执行结果。

Future与Callable的关系与ExecutorService与Executor的关系对应。

FutureTask

Future只是一个接口,无法直接创建对象,因此有了FutureTask。
我们先来看下FutureTask的实现:

public class FutureTask<V> implements RunnableFuture<V>
  • 1

FutureTask类实现了RunnableFuture接口:

public interface RunnableFuture<V> extends Runnable, Future<V> {void run();
}
  • 1
  • 2
  • 3

RunnableFuture继承了Runnable和Future接口,而FutureTask实现了RunnableFuture接口。

FutureTask的继承关系和方法如图所示:
FutureTask

FutureTask是一个可取消的异步计算,FutureTask 实现了Future的基本方法,提供start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果。结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成, 那么计算就不能再次启动或是取消。

一个FutureTask 可以用来包装一个 Callable 或是一个Runnable对象。因为FurtureTask实现了Runnable方法,所以一个 FutureTask可以提交(submit)给一个Excutor执行(excution). 它同时实现了Callable, 所以也可以作为Future得到Callable的返回值。

FutureTask有两个很重要的属性分别是state和runner, FutureTask之所以支持canacel操作,也是因为这两个属性。
其中state为枚举值:

    private volatile int state; // 注意volatile关键字/*** 在构建FutureTask时设置,同时也表示内部成员callable已成功赋值,* 一直到worker thread完成FutureTask中的run();*/private static final int NEW = 0;/*** woker thread在处理task时设定的中间状态,处于该状态时,* 说明worker thread正准备设置result.*/private static final int COMPLETING = 1;/*** 当设置result结果完成后,FutureTask处于该状态,代表过程结果,* 该状态为最终状态final state,(正确完成的最终状态)*/private static final int NORMAL = 2;/*** 同上,只不过task执行过程出现异常,此时结果设值为exception,* 也是final state*/private static final int EXCEPTIONAL = 3;/*** final state, 表明task被cancel(task还没有执行就被cancel的状态).*/private static final int CANCELLED = 4;/*** 中间状态,task运行过程中被interrupt时,设置的中间状态*/private static final int INTERRUPTING = 5;/*** final state, 中断完毕的最终状态,几种情况,下面具体分析*/private static final int INTERRUPTED = 6;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

state初始化为NEW。只有在set, setException和cancel方法中state才可以转变为终态。在任务完成期间,state的值可能为COMPLETING或INTERRUPTING。
state有四种可能的状态转换:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

其他成员变量:

    /** The underlying callable; nulled out after running */private Callable<V> callable;   // 具体run运行时会调用其方法call(),并获得结果,结果时置为null./** The result to return or exception to throw from get() */private Object outcome; // non-volatile, protected by state reads/writes   没必要为votaile,因为其是伴随state 进行读写,而state是FutureTask的主导因素。/** The thread running the callable; CASed during run() */private volatile Thread runner;   //具体的worker thread./** Treiber stack of waiting threads */  private volatile WaitNode waiters;     //Treiber stack 并发stack数据结构,用于存放阻塞在该futuretask#get方法的线程。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

注:Treiber算法可参见:Lock-Free 算法

下面分析下Task的状态变化,也就一个任务的生命周期:
创建一个FutureTask首先调用构造方法:

 public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable}
  • 1
  • 2
  • 3
  • 4

此时将state设置为初始态NEW。这里注意Runnable是怎样转换为Callable的,看下this.callable = Executors.callable(runnable, result); 调用Executors.callable:

public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);
}
  • 1
  • 2
  • 3
  • 4
  • 5
static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run();return result;}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

其实就是通过Callable的call方法调用Runnable的run方法,把传入的 T result 作为callable的返回结果;

当创建完一个Task通常会提交给Executors来执行,当然也可以使用Thread来执行,Thread的start()方法会调用Task的run()方法。看下FutureTask的run()方法的实现:

public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

首先判断任务的状态,如果任务状态不是new,说明任务状态已经改变(说明他已经走了上面4种可能变化的一种,比如caller调用了cancel,此时状态为Interrupting, 也说明了上面的cancel方法,task没运行时,就interrupt, task得不到运行,总是返回);

如果状态是new, 判断runner是否为null, 如果为null, 则把当前执行任务的线程赋值给runner,如果runner不为null, 说明已经有线程在执行,返回。此处使用cas来赋值worker thread是保证多个线程同时提交同一个FutureTask时,确保该FutureTask的run只被调用一次, 如果想运行多次,使用runAndReset()方法。

这里

!UNSAFE.compareAndSwapObject(this, runnerOffset,                                   null, Thread.currentThread())
  • 1

语义相当于

if (this.runner == null ){this.runner = Thread.currentThread();
}
  • 1
  • 2
  • 3

使用compareAndSwap能够保证原子性。关于compareAndSwap的相关内容,可参看:http://huangyunbin.iteye.com/blog/1942369

接着开始执行任务,如果要执行的任务不为空,并且state为New就执行,可以看到这里调用了Callable的call方法。如果执行成功则set结果,如果出现异常则setException。最后把runner设为null。

接着看下set方法:

protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

如果现在的状态是NEW就把状态设置成COMPLETING,然后设置成NORMAL。这个执行流程的状态变化就是: NEW->COMPLETING->NORMAL。

最后执行finishCompletion()方法:

private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;        // to reduce footprint}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

finishCompletion()会解除所有阻塞的worker thread, 调用done()方法,将成员变量callable设为null。这里使用了LockSupport类来解除线程阻塞,关于LockSupport,可参见:LockSupport的park和unpark的基本使用,以及对线程中断的响应性

接下来分析FutureTask非常重要的get方法:

    public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

首先判断FutureTask的状态是否为完成状态,如果是完成状态,说明已经执行过set或setException方法,返回report(s):

    private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

可以看到,如果FutureTask的状态是NORMAL, 即正确执行了set方法,get方法直接返回处理的结果, 如果是取消状态,即执行了setException,则抛出CancellationException异常。

如果get时,FutureTask的状态为未完成状态,则调用awaitDone方法进行阻塞。awaitDone():

    private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

awaitDone方法可以看成是不断轮询查看FutureTask的状态。在get阻塞期间:

  • 如果执行get的线程被中断,则移除FutureTask的所有阻塞队列中的线程(waiters),并抛出中断异常;

  • 如果FutureTask的状态转换为完成状态(正常完成或取消),则返回完成状态;

  • 如果FutureTask的状态变为COMPLETING, 则说明正在set结果,此时让线程等一等;

  • 如果FutureTask的状态为初始态NEW,则将当前线程加入到FutureTask的阻塞线程中去;

  • 如果get方法没有设置超时时间,则阻塞当前调用get线程;如果设置了超时时间,则判断是否达到超时时间,如果到达,则移除FutureTask的所有阻塞列队中的线程,并返回此时FutureTask的状态,如果未到达时间,则在剩下的时间内继续阻塞当前线程。

注:本文源码基于JDK1.7。(1.6通过AQS实现)

这篇关于Callable、Future和FutureTask原理解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/630185

相关文章

使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)

《使用Java解析JSON数据并提取特定字段的实现步骤(以提取mailNo为例)》在现代软件开发中,处理JSON数据是一项非常常见的任务,无论是从API接口获取数据,还是将数据存储为JSON格式,解析... 目录1. 背景介绍1.1 jsON简介1.2 实际案例2. 准备工作2.1 环境搭建2.1.1 添加

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实

在C#中合并和解析相对路径方式

《在C#中合并和解析相对路径方式》Path类提供了几个用于操作文件路径的静态方法,其中包括Combine方法和GetFullPath方法,Combine方法将两个路径合并在一起,但不会解析包含相对元素... 目录C#合并和解析相对路径System.IO.Path类幸运的是总结C#合并和解析相对路径对于 C

Java解析JSON的六种方案

《Java解析JSON的六种方案》这篇文章介绍了6种JSON解析方案,包括Jackson、Gson、FastJSON、JsonPath、、手动解析,分别阐述了它们的功能特点、代码示例、高级功能、优缺点... 目录前言1. 使用 Jackson:业界标配功能特点代码示例高级功能优缺点2. 使用 Gson:轻量

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

python解析HTML并提取span标签中的文本

《python解析HTML并提取span标签中的文本》在网页开发和数据抓取过程中,我们经常需要从HTML页面中提取信息,尤其是span元素中的文本,span标签是一个行内元素,通常用于包装一小段文本或... 目录一、安装相关依赖二、html 页面结构三、使用 BeautifulSoup javascript

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和