FutureTask源码解读

2024-03-18 10:18
文章标签 源码 解读 futuretask

本文主要是介绍FutureTask源码解读,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 背景与简介

Future是Java执行异步任务时的常用接口。我们通常会往ExecutorService中提交一个Callable/Runnable并得到一个Future对象,Future对象表示异步计算的结果,支持获取结果,取消计算等操作。在Java提供的Executor框架中,Future的默认实现为java.util.concurrent.FutureTask。本文针对FutureTask的源码进行分析与解读。

可以看到,FutureTask实现了RunnableFuture, 而RunnableFuture的JavaDoc对Runnable接口的run方法有了更精确的描述:run方法将该Future设置为计算的结果,除非计算被取消。

2. 源码解读

2.1 生命周期状态

FutureTask内置一个被volatile修饰的state变量。按照生命周期的阶段可以分为:

  • NEW 初始状态
  • COMPLETING 任务已经执行完(正常或者异常),准备赋值结果
  • NORMAL 任务已经正常执行完,并已将任务返回值赋值到结果
  • EXCEPTIONAL 任务执行失败,并将异常赋值到结果
  • CANCELLED 取消
  • INTERRUPTING 准备尝试中断执行任务的线程
  • INTERRUPTED 对执行任务的线程进行中断(未必中断到)


可以看到NEW为起始状态,而NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTED这些状态为终止状态,而COMPLETING和INTERRUPTING为中间暂时状态。

2.2 内部结构

  1. Callable callable
    内部封装的Callable对象。如果通过构造函数传的是Runnable对象,FutureTask会通过调用 Executors.callable(runnable, result) ,把Runnable对象封装成一个callable。
  2. Object outcome
    用于保存计算结果或者异常信息。
  3. volatile Thread runner
    用来运行callable的线程。
  4. volatile WaitNode waiters
    FutureTask中用了Trieber Stack来保存等待的线程。

2.3 run方法


public void run() {/* 
     * state为NEW且对runner变量CAS成功。对state的判断写在前面,是一种优化。
     */if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;/*
             * 是否成功运行。
             * 之所以用了这样一个标志位,而不是把set方法写在try中call调用的后一句,
             * 是为了不想捕获set方法出现的异常。
             */boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {/* 
         * 
         * 要清楚,即便在runner被清为null后,仍然有可能有线程会进入到run方法的外层try块。
         * 举例:线程A和B都在执行第一行的if语句读到state == NEW,线程A成功cas了runner,并执行到此处。
         *       在此过程中线程B都没拿到CPU时间片。此时线程B一旦拿到时间片就能进到外层try块。
         *
         * 为了避免线程B重复执行任务,必须在set/setException方法被调用,才能把runner清为null。
         * 这时候其他线程即便进入到了外层try块,也一定能够读到state != NEW,从而避免任务重复执行。
         */runner = null;/*
         * 因为任务执行过程中由于cancel方法的调用,状态为INTERRUPTING,
         * 令当前线程等待INTERRUPTING状态变为INTERRUPTED。
         * 这是为了不想让中断操作逃逸出run方法以至于线程在执行后续操作时被中断。
         */int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {// 通过CAS状态来确认计算没有被取消,结果也没有被设置过。if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}protected void setException(Throwable t) {// 通过CAS状态来确认计算没有被取消,结果也没有被设置过。if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}
}private void finishCompletion() {for (WaitNode q; (q = waiters) != null;) {// 必须将栈顶CAS为null,否则重读栈顶并重试。if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 遍历并唤醒栈中节点对应的线程。for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;// 将next域置为null,这样对GC友好。q.next = null; q = next;}break;}}/*
     * done方法是暴露给子类的一个钩子方法。
     *
     * 这个方法在ExecutorCompletionService.QueueingFuture中的override实现是把结果加到阻塞队列里。
     * CompletionService谁用谁知道,奥秘全在这。
     */done();/* 
     * callable置为null主要为了减少内存开销,
     * 更多可以了解JVM memory footprint相关资料。
     */callable = null;
}private void handlePossibleCancellationInterrupt(int s) {/*
     * 这里的主要目的就是等调用cancel方法的线程完成中断。    
     * 以防止中断操作逃逸出run或者runAndReset方法,影响后续操作。
     *
     * 实际上,当前调用cancel方法的线程不一定能够中断到本线程。
     * 有可能cancel方法里读到runner是null,甚至有可能是其它并发调用run/runAndReset方法的线程。
     * 但是也没办法判断另一个线程在cancel方法中读到的runner到底是什么,所以索性自旋让出CPU时间片也没事。
     */if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield();/*
     * 下面的代码在JDK8中已经被注释掉了。
     * 因为在原来的设计中,是想把cancel方法设置的中断位给清除的。
     * 但是实际上也应该允许调用FutureTask的线程使用中断作为线程间通信的机制,
     * 这里没办法区分中断位到底是不是来自cancel方法的调用。
     */// assert state == INTERRUPTED;// We want to clear any interrupt we may have received from// cancel(true).  However, it is permissible to use interrupts// as an independent mechanism for a task to communicate with// its caller, and there is no way to clear only the// cancellation interrupt.//// Thread.interrupted();
}

2.4 get方法


public V get() throws InterruptedException, ExecutionException {int s = state;// NEW或者COMPLETING。if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;// 完成赋值if (s > COMPLETING) {// 如果q已经被初始化了,为了GC需要清q.thread。if (q != null)q.thread = null;return s;}// COMPLETING是一个很短暂的状态,调用Thread.yield期望让出时间片,之后重试循环。else if (s == COMPLETING)Thread.yield();// 初始化节点,重试一次循环。else if (q == null)q = new WaitNode();// queued记录是否已经入栈,此处准备将节点压栈。else if (!queued)/*
             *  这是Treiber Stack算法入栈的逻辑。
             *  Treiber Stack是一个基于CAS的无锁并发栈实现,
             */queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 如果有时限,判断是否超时,未超时则park剩下的时间。else if (timed) {nanos = deadline - System.nanoTime();// 超时,移除栈中节点。if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}
}/**
 * 清理用于保存等待线程栈里的节点。
 * 所谓节点无效就是内部的thread为null,
 * 一般有以下几种情况:
 * 1. 节点调用get超时。
 * 2. 节点调用get中断。
 * 3. 节点调用get拿到task的状态值(> COMPLETING)。
 *
 * 此方法干了两件事情:
 * 1. 置标记参数node的thread为null。
 * 2. 清理栈中的无效节点。
 *
 * 如果在遍历过程中发现有竞争则重新遍历栈。
 */
private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null;retry:for (;;) {          // restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;// 如果当前节点仍有效,则置pred为当前节点,继续遍历。if (q.thread != null)pred = q;/*
                 * 当前节点已无效且有前驱,则将前驱的后继置为当前节点的后继实现删除节点。
                 * 如果前驱节点已无效,则重新遍历waiters栈。
                 */else if (pred != null) {pred.next = s;if (pred.thread == null)continue retry;}/*
                 * 当前节点已无效,且当前节点没有前驱,则将栈顶置为当前节点的后继。
                 * 失败的话重新遍历waiters栈。
                 */else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}break;}}
}/**
 * 导出结果。
 */
private V report(int s) throws ExecutionException {Object x = outcome;// 正常执行完计算任务。if (s == NORMAL)return (V)x;// 取消。if (s >= CANCELLED)throw new CancellationException();// 执行计算任务时发生异常。throw new ExecutionException((Throwable)x);
}

2.5 cancel方法


public boolean cancel(boolean mayInterruptIfRunning) {/*
     * 在状态还为NEW的时候,根据参数中的是否允许传递,
     * 将状态流转到INTERRUPTING或者CANCELLED。
     */if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {if (mayInterruptIfRunning) {try {// 中断runner线程。Thread t = runner;if (t != null)t.interrupt();} finally {UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 该方法上文已经分析过。finishCompletion();}return true;
}

3. FutureTask存在的问题

 

3.1 cancel(true)调用interrupt的线程对象

FutureTask的run方法的进入条件是

state == NEW && UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))

假设有两个线程A和B调用run方法,线程C调用cancel方法。
时刻1: 线程A和B同时读到state == NEW。
时刻2: 线程A成功对runner变量CAS进入run方法主体。
时刻3: 线程C调用cancel方法,成功将状态CAS为CANCELLED。
时刻4: 线程A调用finally中的runner = null。
时刻5: 线程B开始执行run方法第一句if的后半句,成功将runner变量CAS到线程B。
时刻6: 线程C读到runner为线程B,准备对线程B进行interrupt()
时刻7: 线程A调用handlePossibleCancellationInterrupt等待状态从INTERRUPTING流转至INTERRUPTED。
时刻8: 线程B被中断。

这里的问题是,调用cancel方法的线程C中断的是实质上没有对callable进行call调用的线程B,而线程A还试图防止中断操作逃逸出run方法。
这个东西在Future的JavaDoc上说了很含糊,如下所示:

* @param mayInterruptIfRunning {@code true} if the thread executing this* task should be interrupted; otherwise, in-progress tasks are allowed* to complete

上面的情况到底线程A和B哪个算是the thread executing this task说不清。

3.2 内存占用问题

通过阅读源码,发现FutureTask还是存在一个隐形的内存占用问题的,或者按照《Effective Java》上说的应该叫无意识的对象保留。
这个问题就是在FutureTask计算完成后,可能内部用于保存等待线程的栈留有一些已经无用的等待节点。

时刻1: 某线程调用get,已经入等待栈,此时waiters为该线程对应节点。
时刻2: 有大量线程通过调用get试图获取计算结果,get -> awaitDone方法中,经过两轮循环都读到状态是NEW的话,此时它们节点已经被初始化过了,但还没开始入队。
时刻3: 有线程调用run方法,通过run -> set -> finishCompletion,将waiters置为null,并唤醒了已经入栈的那个线程。
时刻4: 调用awaitDone方法的那些线程再试图入队的话,后面循环会发现状态已经是NORMAL了,但是waiters栈此时不为空,而且再也没法被清掉了。这样下来,该FutureTask内部可能会留有一些的无效节点。具体会留多少实际上取决于那个瞬间有多少线程准备执行以及多少能够成功CAS。

queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

这篇关于FutureTask源码解读的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/822025

相关文章

springboot家政服务管理平台 LW +PPT+源码+讲解

3系统的可行性研究及需求分析 3.1可行性研究 3.1.1技术可行性分析 经过大学四年的学习,已经掌握了JAVA、Mysql数据库等方面的编程技巧和方法,对于这些技术该有的软硬件配置也是齐全的,能够满足开发的需要。 本家政服务管理平台采用的是Mysql作为数据库,可以绝对地保证用户数据的安全;可以与Mysql数据库进行无缝连接。 所以,家政服务管理平台在技术上是可以实施的。 3.1

高仿精仿愤怒的小鸟android版游戏源码

这是一款很完美的高仿精仿愤怒的小鸟android版游戏源码,大家可以研究一下吧、 为了报复偷走鸟蛋的肥猪们,鸟儿以自己的身体为武器,仿佛炮弹一样去攻击肥猪们的堡垒。游戏是十分卡通的2D画面,看着愤怒的红色小鸟,奋不顾身的往绿色的肥猪的堡垒砸去,那种奇妙的感觉还真是令人感到很欢乐。而游戏的配乐同样充满了欢乐的感觉,轻松的节奏,欢快的风格。 源码下载

基于Java医院药品交易系统详细设计和实现(源码+LW+调试文档+讲解等)

💗博主介绍:✌全网粉丝10W+,CSDN作者、博客专家、全栈领域优质创作者,博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌💗 🌟文末获取源码+数据库🌟 感兴趣的可以先收藏起来,还有大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助更多的人  Java精品实战案例《600套》 2023-2025年最值得选择的Java毕业设计选题大全:1000个热

美容美发店营销版微信小程序源码

打造线上生意新篇章 一、引言:微信小程序,开启美容美发行业新纪元 在数字化时代,微信小程序以其便捷、高效的特点,成为了美容美发行业营销的新宠。本文将带您深入了解美容美发营销微信小程序,探讨其独特优势及如何助力商家实现业务增长。 二、微信小程序:美容美发行业的得力助手 拓宽客源渠道:微信小程序基于微信社交平台,轻松实现线上线下融合,帮助商家快速吸引潜在客户,拓宽客源渠道。 提升用户体验:

风水研究会官网源码系统-可展示自己的领域内容-商品售卖等

一款用于展示风水行业,周易测算行业,玄学行业的系统,并支持售卖自己的商品。 整洁大气,非常漂亮,前端内容均可通过后台修改。 大致功能: 支持前端内容通过后端自定义支持开启关闭会员功能,会员等级设置支持对接官方支付支持添加商品类支持添加虚拟下载类支持自定义其他类型字段支持生成虚拟激活卡支持采集其他站点文章支持对接收益广告支持文章评论支持积分功能支持推广功能更多功能,搭建完成自行体验吧! 原文

HTML5文旅文化旅游网站模板源码

文章目录 1.设计来源文旅宣传1.1 登录界面演示1.2 注册界面演示1.3 首页界面演示1.4 文旅之行界面演示1.5 文旅之行文章内容界面演示1.6 关于我们界面演示1.7 文旅博客界面演示1.8 文旅博客文章内容界面演示1.9 联系我们界面演示 2.效果和源码2.1 动态效果2.2 源代码2.3 源码目录 源码下载万套模板,程序开发,在线开发,在线沟通 作者:xcLeigh

mediasoup 源码分析 (八)分析PlainTransport

mediasoup 源码分析 (六)分析PlainTransport 一、接收裸RTP流二、mediasoup 中udp建立过程 tips 一、接收裸RTP流 PlainTransport 可以接收裸RTP流,也可以接收AES加密的RTP流。源码中提供了一个通过ffmpeg发送裸RTP流到mediasoup的脚本,具体地址为:mediasoup-demo/broadcaste

Java并发编程—阻塞队列源码分析

在前面几篇文章中,我们讨论了同步容器(Hashtable、Vector),也讨论了并发容器(ConcurrentHashMap、CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便。今天我们来讨论另外一类容器:阻塞队列。   在前面我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了D

线程池ThreadPoolExecutor类源码分析

Java并发编程:线程池的使用   在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:   如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。   那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

ConcurrentHashMap之源码分析

集合是编程中最常用的数据结构。而谈到并发,几乎总是离不开集合这类高级数据结构的支持。比如两个线程需要同时访问一个中间临界区(Queue),比如常会用缓存作为外部文件的副本(HashMap)。这篇文章主要分析jdk1.5的3种并发集合类型(concurrent,copyonright,queue)中的ConcurrentHashMap,让我们从原理上细致的了解它们,能够让我们在深度项目开发中获益非浅