看图学源码之FutureTask

2023-12-24 23:28
文章标签 源码 futuretask 看图学

本文主要是介绍看图学源码之FutureTask,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RunnableFuture

源码学习:

成员变量

任务的运行状态的转化

image-20231224220654494

package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;/**可取消的异步计算。该类提供了Future的基本实现,包括启动和取消计算的方法,查询计算是否完成以及获取计算结果的方法。只有在计算完成后才能获取结果;如果计算尚未完成,get方法将会阻塞。一旦计算完成,就无法重新启动或取消计算(除非使用runAndReset方法调用计算)。FutureTask可以用来包装一个Callable或Runnable对象。由于FutureTask实现了Runnable接口,因此可以将FutureTask提交给Executor执行。 除了作为独立的类使用外,该类还提供了一些受保护的功能,这些功能在创建自定义任务类时可能会有用。*/
public class FutureTask<V> implements RunnableFuture<V> {/**此任务的运行状态,初始为NEW。运行状态仅在set、setException和cancel方法中转换为终态。在完成过程中,状态可能会暂时变为COMPLETING(在设置结果时)或INTERRUPTING(仅在中断执行者以满足cancel(true)时)。从这些中间状态到最终状态的转换使用更便宜的有序/懒惰写入,因为这些值是唯一的且不会进一步修改。* Possible state transitions:* NEW -> COMPLETING -> NORMAL  正常结束* NEW -> COMPLETING -> EXCEPTIONAL   异常结束* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/// 状态值: 表示任务运行的状态private volatile int state;// 新建 或者 正在运行private static final int NEW          = 0;// 中间状态(任务执行完了,但是结果集(正结果/ 异常) 没有设置到 outcome)private static final int COMPLETING   = 1;// 正常执行完成(结果集设置到outcome之后,正常结束)private static final int NORMAL       = 2;// 异常执行完成(结果集设置到outcome之后,异常结束)private static final int EXCEPTIONAL  = 3;// 取消private static final int CANCELLED    = 4;// 中断(中间值)[但是还没有中断]private static final int INTERRUPTING = 5;// 中断完成,最终状态private static final int INTERRUPTED  = 6;/** The underlying callable; nulled out after running */// 执行目标private Callable<V> callable;/** The result to return or exception to throw from get() */// 结果集private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */// 执行任务的线程private volatile Thread runner;/** Treiber stack of waiting threads */// get 阻塞的时候,使用 WaitNode{物理结构:链表;逻辑结构:栈}去存储阻塞的线程private volatile WaitNode waiters;public FutureTask(Callable<V> callable) {if (callable == null)  throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);  // 适配器的方式this.state = NEW;       // ensure visibility of callable}public boolean isCancelled() {return state >= CANCELLED;}public boolean isDone() {return state != NEW;}/*** 简单的链表节点,用于记录Treiber堆栈中的等待线程。*/static final class WaitNode {volatile Thread thread;volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long stateOffset;private static final long runnerOffset;private static final long waitersOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}}}

run()

任务只能执行一次

image-20231224221445999

public void run() {//  状态 是New   并且 cas 成功的把当前线程设置到  runner 才能执行后续的方法,否则就直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;// 当前 要执行的任务存在,并且  状态 是New 才会调用目标逻辑  c.call()if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();// 目标逻辑执行成功ran = true;} catch (Throwable ex) {// 目标逻辑执行 失败 ,结果 置为 nullresult = null;ran = false;// 设置异常结果集setException(ex);}if (ran)// 设置正常结果集set(result);}} finally {// 在任务状态被确定之前,runner必须非空,以防止对run()方法的并发调用。runner = null;// 在将runner设置为null之后,必须重新读取任务的状态,以防止泄漏的中断。int s = state;if (s >= INTERRUPTING)  // 处于中断状态,执行中断后续逻辑handlePossibleCancellationInterrupt(s);}
}@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}/*将此Future的结果设置为给定的值,除非此Future已经被设置或已取消。在计算成功完成时,此方法由run方法在内部调用。
*/protected void set(V v) {// cas 的方式把状态变为  COMPLETING ,设置成功if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置结果集为 结果 result outcome = v;// cas 的方式把状态变为最终状态: NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// 执行后续操作finishCompletion();}}/*将此Future报告为ExecutionException,将给定的throwable作为其原因,除非此Future已经被设置或已取消。在计算失败时,此方法由run方法在内部调用。
*/protected void setException(Throwable t) {// cas 的方式把状态变为  COMPLETING ,设置成功if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 设置结果集为 异常outcome = t;// cas 的方式把状态变为最终状态: EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state// 执行后续操作finishCompletion();}}/*
确保任何来自可能的cancel(true)取消操作的中断仅在run或runAndReset方法中传递给任务的目的。
*/private void handlePossibleCancellationInterrupt(int s) {//  解释了在等待中断信号时使用自旋等待的目的: 通过中断来取消任务的执行。然而,可能存在一种情况,即中断线程在  有机会中断当前线程  之前被阻塞。为了等待中断信号的到来,代码使用了自旋等待的策略。if (s == INTERRUPTING)while (state == INTERRUPTING) // 处于中间状态的时候就 让出 cpuThread.yield(); // wait out pending interrupt// assert state == INTERRUPTED;// 解释了在state等于INTERRUPTED时的处理逻辑// 它使用断言(assert)来确保任务的状态为INTERRUPTED。断言通常用于在代码中插入一些检查,以确保某些条件为真。如果断言的条件为假,将会抛出一个AssertionError异常。// Thread.interrupted();// 解释了清除可能来自cancel(true)取消操作的中断的目的。但是,中断也可以作为一个独立的机制,用于任务与其调用者之间的通信,并且没有办法只清除取消中断。因此,为了清除中断,代码调用了Thread.interrupted()方法。// Thread.interrupted()方法用于清除当前线程的中断状态,并返回之前的中断状态。这样做的目的是确保任务的中断状态被清除,以便后续的代码或操作不会受到中断的影响。}
finishCompletion()

image-20231224221706281

    /*** 移除并唤醒所有等待的线程,调用done()方法,并将callable置为null。*/private void finishCompletion() {// assert state > COMPLETING;// 循环获取等待队列中的等待节点 waiters, 等待节点里面保存了等待任务完成的线程for (WaitNode q; (q = waiters) != null;) {// 要是cas 的方式成功的将等待队列 waitersOffset 设置为 null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 循环处理每一个等待的节点for (;;) {// 取出当前等待节点 持有的线程Thread t = q.thread;// 线程存在if (t != null) {// 将等待节点的线程引用设为null,并调用LockSupport.unpark(t)方法来唤醒该线程q.thread = null;LockSupport.unpark(t); //LockSupport.unpark(t)方法用于唤醒一个被阻塞的线程。}WaitNode next = q.next; // 取节点下一个元素if (next == null)// 要是没有后继节点,此时表示已经处理完所有等待节点,退出 死循环break;// 将后继节点置为 nullq.next = null; // unlink to help gc // 节点后移q = next;}break; // (q = waiters) == null 退出循环}}done(); // 调用done()方法来完成任务的执行 —————— 钩子方法/*这段代码是一个保护(protected)方法,当任务转换为已完成状态(isDone)时被调用,无论是正常完成还是通过取消完成。默认实现不执行任何操作。子类可以重写这个方法来调用完成回调或进行记录。注意,在该方法的实现中,您可以查询状态来确定任务是否已被取消。protected void done() { }*/callable = null;        // to reduce footprint  将callable引用设为null,以减少内存占用。}

runAndReset()

任务可以执行多次

  • 和 run() 的区别就是 没有正常的结果设置结果集

image-20231224221750745

/**执行计算,但不设置其结果,然后将该Future重置为初始状态。如果计算遇到异常或被取消,则无法执行重置操作。这个方法设计用于那些本质上需要执行多次的任务。
*/// 执行并重置任务
protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}/*和 run()  的区别就是  没有正常的结果设置结果集*/}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptss = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;
}

get()

image-20231224222000579

    /*** 如果计算被取消  会抛出异常*/public V get() throws InterruptedException, ExecutionException {int s = state;// 此时的状态只有  New  (新建 或者 正在执行) 或者  COMPLETING(任务执行结束,结果集该没有设置成功)if (s <= COMPLETING)// 阻塞等待s = awaitDone(false, 0L);// 否则的话 返回结果集(正常 或者 异常)return report(s);}/**等待方法,根据传入的参数决定是否使用定时等待。如果使用定时等待,则会在指定的时间内  等待完成   或者  在中断或超时时中止。*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;//[注:] 带有阻塞的都要有 死循环,防止虚假唤醒for (;;) {  // 死循环if (Thread.interrupted()) { // 判断是不是中断removeWaiter(q);  // 移除节点 throw new InterruptedException(); // 抛错} int s = state;if (s > COMPLETING) {// 此时结果设置成功// 当前节点存在,将其持有的线程 置空if (q != null)q.thread = null;// 返回结果,结束阻塞return s;}// 还是在设置结果的状态,让出 cpu else if (s == COMPLETING) // cannot time out yetThread.yield();// 处于 new  状态else if (q == null)// 创建节点q = new WaitNode();// 插入链表else if (!queued)// cas 的方式 头插法queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 超时等待,else if (timed) {nanos = deadline - System.nanoTime();// 时间已过,移除结果,返回状态if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}/***完成任务之后 返回结果或者抛出异常*/@SuppressWarnings("unchecked")private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
removeWaiter(WaitNode node)

image-20231224222121607

    /*** 尝试取消链接超时或中断的等待节点,以避免累积垃圾。内部节点只是简单地取消拼接,而无需使用CAS,因为如果它们被释放器遍历,这样做是无害的。为了避免从已经删除的节点中取消拼接的影响,在明显存在竞争的情况下,列表将被重新遍历。当节点很多时,这会很慢,但我们不希望列表足够长以抵消更高开销的方案。*/private void removeWaiter(WaitNode node) {if (node != null) {// 当前节点存在// 将传入节点的线程引用置为null,表示该节点不再持有线程node.thread = null;retry:// 死循环for (;;) {          // restart on removeWaiter race// 遍历链表 中的所有的 等待节点for (WaitNode pred = null, q = waiters, s; q != null; q = s) {// 取出下一个节点s = q.next;// 要是此等待节点持有的线程不是 nullif (q.thread != null)pred = q; // 将前置节点设置为 当前节点q else if (pred != null) {// 等待节点持有的线程是 null ,但是前置节点 不是 null// 前驱节点的next指向当前节点pred.next = s; // 前置节点持有的线程不存在了,表示存在竞争情况,需要重新开始循环。执行下次死循环if (pred.thread == null) // check for racecontinue retry;}
//等待节点持有的线程是 null ,但是前置节点 是 null, cas的方式成功的将节点下移,当前节点从等待队列中移除,执行下次死循环// cas的方式将waitersOffset处的值从q替换为selse if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry;}// 内层遍历结束,等待集合中没有无效节点break;}}}

get(long timeout, TimeUnit unit)

image-20231224222252206

public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state; // 获取当前对象的状态//  调用awaitDone方法来等待操作完成,如果返回的状态值小于等于COMPLETING,则表示操作未完成,继续等待,如果等待的时间超过了超时时间,则抛出TimeoutException异常。if (s <= COMPLETING &&  (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();//将最终的状态值作为参数传递给report方法,并返回report方法的返回值。return report(s);
}

cancel()

image-20231224222312072

    public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;/*mayInterruptIfRunning:true:  可以中断正在执行的任务 INTERRUPTINGfasle: 不可以中断正在执行的任务 CANCELLED*/
// 状态是 new ;cas 的方式把 任务的状态从"NEW"修改为"INTERRUPTING"或"CANCELLED"。如果修改成功,表示取消成功,返回true。try {    // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;  // 尝试中断正在执行任务的线程if (t != null)  // 如果任务的runner不为null,则调用interrupt()方法中断线程。t.interrupt();} finally { // final state// 设置任务最终的状态  cas 的方式将任务的状态修改为"INTERRUPTED"。UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 调用finishCompletion()方法完成任务的处理finishCompletion();}return true; //返回true表示取消成功。}

手撕FutureTask:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;public class FutureTask_<T> implements Runnable {private Future_<T> future;public FutureTask_(Future_<T> future) {this.future = future;}public FutureTask_(Runnable runnable) {this.future = new FutureAdaptive(runnable);}@Overridepublic void run() {try{res = this.future.code();state = 1;}catch (Exception e){res = e;state = 2;}for (Thread thread : threadList){LockSupport.unpark(thread); // 唤醒}}private Object res;private  volatile  int state;private List<Thread>  threadList = new ArrayList<>();public T get(){for (;;){if(state == 0){threadList.add(Thread.currentThread());LockSupport.park();  // 阻塞}else if(state == 1){return (T)res;}else if(state == 2){throw new RuntimeException(res.toString());}}}private class FutureAdaptive implements Future_<T> {public  Runnable runnable;public FutureAdaptive(Runnable runnable) {this.runnable = runnable;}@Overridepublic T code() throws Exception {this.runnable.run();return null;}}
}class MM {public static void main(String[] args){Future_<String> future = new Future_<String>() {@Overridepublic String code() throws Exception {return "future";}};Runnable runnable = new Runnable(){@Overridepublic void run() {System.out.println("runnable");}};FutureTask_<String> future_ = new FutureTask_<String>(future);FutureTask_<String> runnable_ = new FutureTask_<String>(runnable);new Thread(future_).start();new Thread(runnable_).start();System.out.println(future_.get());LockSupport.parkNanos(2*1000*1000*1000);}
}interface Future_<T>{T code() throws Exception;
}

这篇关于看图学源码之FutureTask的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/533478

相关文章

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

red5-server源码

red5-server源码:https://github.com/Red5/red5-server