Java源码分析----Future

2024-08-30 09:58
文章标签 java 分析 源码 future

本文主要是介绍Java源码分析----Future,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一般使用多线程操作的时候会使用Thread+Runnable进行处理,但是这种方式中,Runnable是没有返回值的,假设我们需要获取Runnable的返回值,可能需要如下特殊处理,伪代码如下

String returnValue1 = "";
String returnValue2 = "";
CountDownLatch cdl = ....
new Thread(()->{// xxxx操作returnValue1 = "返回值";cdl.countDown
})new Thread(()->{// xxxx操作returnValue2 = "返回值";cdl.countDown
});
cdl wait// 程序阻塞在这print returnValue1
print returnValue2

当Runnable运行完并且赋值完毕则通知CDL,最后主线程在wait处等待两个线程执行完毕,然后获取Runnable的返回值。
这样的做法比较麻烦,而JDK提供了一个叫做Future的东西,他实现了上述的功能,且使用上更加的简便,看下例子

ExecutorService executorService = Executors.newFixedThreadPool(1);Future<String> future = executorService.submit(() -> {try {Thread.sleep(100000);} catch (InterruptedException e) {e.printStackTrace();}return "xxxx";});try {String value = future.get();System.out.println(value);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}

操作上更加的简便,调用Future的get方法的时候,就类似cdl wait+获取returnValue1

submit方法

那么下面看下其中是如何实现的,先看下submit方法,实现在java.util.concurrent.AbstractExecutorService中

    public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();// 将Callable转换成RunnableFuture,它继承了Runnable和FutureRunnableFuture<T> ftask = newTaskFor(task);// 线程池的execute方法,参数类型为Runnableexecute(ftask);return ftask;}

从submit方法可以看出,submit也是执行的execute方法,虽然参数不一样,但是其中Callable转换成Runnable,即RunnableFuture的实现,并将其返回,也就是上述例子中的Future。

这里和第一个例子做类比,这里返回的Future共包含几个功能,简化了使用

  • 赋值->returnValue1=“xxx”
  • 阻塞->cdl.wait
  • 取值

newTaskFor

newTaskFor方法如下:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public FutureTask(Runnable runnable, V result) {// runnable封装成callablethis.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable}

返回一个FutureTask,即RunnableFuture的实现类,也是一个Runnable的子类,当调用execute方法的时候,就会执行FutureTask的run方法,这里先不看run方法实现,先看下FutureTask的内部属性

     /*** NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/// 当前Future的状态值,加了volatile修改,则代表一个线程改变后,马上对另外线程可见private volatile int state;private static final int NEW          = 0;// 初始状态private static final int COMPLETING   = 1;// 执行完毕,还未进行赋值private static final int NORMAL       = 2;// 最终完成状态private static final int EXCEPTIONAL  = 3;// 出现异常private static final int CANCELLED    = 4;// 取消状态private static final int INTERRUPTING = 5;// 正被中断private static final int INTERRUPTED  = 6;// 被中断private Callable<V> callable;private Object outcome; // 返回值,不一定是Callable的返回值,出现异常的时候放的是异常对象private volatile Thread runner;// 执行run方法的线程private volatile WaitNode waiters;//阻塞等待的节点

run方法

这时候再看下run方法

    public void run() {// 状态不为初始化值,证明已经执行过,直接返回// 状态为NEW,则将runner设置为当前线程,如果失败,证明别的线程已经在操作,直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;// 状态为初始化状态才执行,因为有可能被中断或者调用cancel取消if (c != null && state == NEW) {V result;boolean ran;try {// 执行用户定义的call方法,并取的返回值result = c.call();// 执行成功ran = true;} catch (Throwable ex) {// 有异常出现,返回值置空,设置成执行失败result = null;ran = false;// 设置状态,和设置返回值为异常对象setException(ex);}if (ran)// 执行成功,设置返回值set(result);}} finally {// runner在状态值改变之后才能设置为空,否则可能出现多个线程执行run的情况runner = null;// 中断处理int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

setException和set方法

其中主要看setException和set方法

    protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}

调用了set或者setException方法,状态会变成COMPLETING,执行完成后,会将状态设置为NORMAL或者EXCEPTIONAL,而成功时outcome是正常返回值,失败则是Throwable对象。

finishCompletion方法

最后,调用finishCompletion将阻塞的线程唤醒,遍历waiters,调用unpark唤醒线程,处理和AQS有相似之处

 private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;}

get方法

get方法有两种形式,一种是程序会一直等待结果返回,而另外一种是有等待时间的,当时间到了之后,还未返回,则会抛出异常

    public V get() throws InterruptedException, ExecutionException {int s = state;// 还未完成,则进入awaitDone,阻塞线程if (s <= COMPLETING)s = awaitDone(false, 0L);// 完成之后调用方法获取返回值return report(s);}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;// 还未完成,则进入awaitDone,阻塞线程// 时间到了之后会返回状态值s,如果此时还未完成,那么抛出异常if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);}

可以看到两种方法实现是类似的,但是有时间参数的方法,在指定时间内无法获得返回值的话是会抛出异常的,这点在使用的时候需要注意
awaitDone方法会在状态为未完成状态下阻塞线程,当完成或者指定时间到达的时候返回当前状态,此时有两种情况

  • 如果没有时间参数:返回完成状态(>COMPLETING的)、
  • 如果有时间参数:返回当时的状态,有可能是NEW,COMPLETING或者其他

接下来看下awaitDone方法实现

    private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 是否有指定时间,如果没有则为0// 如果有,则deadLine为当前时间+超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;// 表示当前线程节点boolean queued = false;// 是否在排队for (;;) {if (Thread.interrupted()) {// 当前线程被中断removeWaiter(q);// 从链表中移除当前线程节点throw new InterruptedException();}int s = state;if (s > COMPLETING) {// 正常完成状态,直接返回if (q != null)q.thread = null;return s;}// 还在执行当中,但是从上可以知道,这种状态很快变化为最终状态// 所以这里使用yield而不是阻塞可能就是这个原因吧else if (s == COMPLETING) Thread.yield();else if (q == null)// 状态为NEW且为第一次循环,则代表还未结束处理,则先构造一个线程节点q = new WaitNode();else if (!queued)// 在上一个分支判断后,进入下一次循环,如果还是NEW的状态// 则将线程节点挂在链表头部queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {// 如果有时间参数,则阻塞特定时间nanos = deadline - System.nanoTime();if (nanos <= 0L) {// 减少得到deadLine已经到了,则将该节点移除,并返回当前状态removeWaiter(q);return state;}// 阻塞特定时间LockSupport.parkNanos(this, nanos);}else// 没有时间参数的话则一直阻塞知道整个任务完成LockSupport.park(this);}}

这里逻辑不太难懂,总结一下分支:

  • 线程如果被中断,则从连接移除当前线程并返回异常
  • 如果为正常完成状态,则直接返回状态值
  • 如果为COMPLETING状态则让出CPU,重新进入循环重新判断
  • 如果为NEW状态且第一次循环则创建线程节点
  • 如果为NEW状态且非第一次循环,则将线程节点挂在链表头部
  • 如果为NEW状态且已经挂在了链表头部但是有时间参数则阻塞特定时间
  • 如果为NEW状态且已经挂在了链表头部但是没有时间参数则一直阻塞直到任务完成

从上可以知道,一个线程调用get方法后,最多会执行4次循环:

  1. 创建线程节点
  2. 节点加入链表
  3. 阻塞
  4. 任务完成唤醒后再进行一次判断并返回

可以看到这里的循环次数还是比较多的,相当于先来几次自旋再阻塞。

cancel方法

Future还提供了取消任务的入口,即cancel方法,内部将对应的线程进行中断,使正在执行的线程退出

    public boolean cancel(boolean mayInterruptIfRunning) {// 状态为NEW // 如果mayInterruptIfRunning为true,状态设置成INTERRUPTING状态// 否则设置成CANCELLED// 如果状态不为NEW或者说状态设置失败了,则返回falseif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // 如果设置为true,那么取出当前正在执行的线程,并将其中断// 最后设置中断完成状态if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 唤醒阻塞线程finishCompletion();}return true;}
  • mayInterruptIfRunning为true:状态设置为INTERRUPTING,设置运行run方法的线程的中断标志
  • mayInterruptIfRunning为false:状态设置为CANCELLED

综合run方法的逻辑,cancel方法不能取消非NEW状态的任务,且只是设置了一个中断位的标志,如果run方法已经执行到判断状态位后的代码准备运行或者已经运行了,那么cancel还是无法终止任务的执行

题外话

Dubbo实现了自己的Future,整体的交互过程其实是类似的,但是逻辑会比JDK自带的Future会简单一点,因为其中没有多个线程对Future进行get的操作,所以从get的性能上讲,Dubbo的会快一点

这篇关于Java源码分析----Future的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120501

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

Java进阶13讲__第12讲_1/2

多线程、线程池 1.  线程概念 1.1  什么是线程 1.2  线程的好处 2.   创建线程的三种方式 注意事项 2.1  继承Thread类 2.1.1 认识  2.1.2  编码实现  package cn.hdc.oop10.Thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听