深读源码-java线程系列之线程池深入解析——未来任务执行流程

本文主要是介绍深读源码-java线程系列之线程池深入解析——未来任务执行流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简介

前面我们一起学习了线程池中普通任务的执行流程,但其实线程池中还有一种任务,叫作未来任务(future task),使用它您可以获取任务执行的结果,它是怎么实现的呢?

建议学习本章前先去看看之前写的《深读源码-java线程系列之自己手写一个线程池进阶版》,有助于理解本章的内容,且那边的代码比较短小,学起来相对容易一些。

问题

(1)线程池中的未来任务是怎么执行的?

(2)我们能学到哪些比较好的设计模式?

(3)对我们未来学习别的框架有什么帮助?

来个栗子

我们还是从一个例子入手,来讲解来章的内容。

我们定义一个线程池,并使用它提交5个任务,这5个任务分别返回0、1、2、3、4,在未来的某一时刻,我们再取用它们的返回值,做一个累加操作。

public class ThreadPoolTest02 {public static void main(String[] args) throws ExecutionException, InterruptedException {// 新建一个固定5个线程的线程池ExecutorService threadPool = Executors.newFixedThreadPool(5);List<Future<Integer>> futureList = new ArrayList<>();// 提交5个任务,分别返回0、1、2、3、4for (int i = 0; i < 5; i++) {int num = i;// 任务执行的结果用Future包装Future<Integer> future = threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("return: " + num);// 返回值return num;});// 把future添加到list中futureList.add(future);}// 任务全部提交完再从future中get返回值,并做累加int sum = 0;for (Future<Integer> future : futureList) {sum += future.get();}System.out.println("sum=" + sum);}
}

这里我们思考两个问题:

(1)如果这里使用普通任务,要怎么写,时间大概是多少?

如果使用普通任务,那么就要把累加操作放到任务里面,而且并不是那么好写(final的问题),总时间大概是1秒多一点。但是,这样有一个缺点,就是累加操作跟任务本身的内容耦合到一起了,后面如果改成累乘,还要修改任务的内容。

(2)如果这里把future.get()放到for循环里面,时间大概是多少?

这个问题我们先不回答,先来看源码分析。

submit()方法

submit方法,它是提交有返回值任务的一种方式,内部使用未来任务(FutureTask)包装,再交给execute()去执行,最后返回未来任务本身。

public <T> Future<T> submit(Callable<T> task) {// 非空检测if (task == null) throw new NullPointerException();// 包装成FutureTaskRunnableFuture<T> ftask = newTaskFor(task);// 交给execute()方法去执行execute(ftask);// 返回futureTaskreturn ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {// 将普通任务包装成FutureTaskreturn new FutureTask<T>(callable);
}

这里的设计很巧妙,实际上这两个方法都是在AbstractExecutorService这个抽象类中完成的,这是模板方法的一种运用。

我们来看看FutureTask的继承体系:

 

FutureTask实现了RunnableFuture接口,而RunnableFuture接口组合了Runnable接口和Future接口的能力,而Future接口提供了get任务返回值的能力。

问题:submit()方法返回的为什么是Future接口而不是RunnableFuture接口或者FutureTask类呢?

答:这是因为submit()返回的结果,对外部调用者只想暴露其get()的能力(Future接口),而不想暴露其run()的能力(Runaable接口)。

FutureTask类的run()方法

经过上一章的学习,我们知道execute()方法最后调用的是task的run()方法,上面我们传进去的任务,最后被包装成了FutureTask,也就是说execute()方法最后会调用到FutureTask的run()方法,所以我们直接看这个方法就可以了。

public void run() {// 状态不为NEW,或者修改为当前线程来运行这个任务失败,则直接返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {// 真正的任务Callable<V> c = callable;// state必须为NEW时才运行if (c != null && state == NEW) {// 运行的结果V result;boolean ran;try {// 任务执行的地方result = c.call();// 已执行完毕ran = true;} catch (Throwable ex) {result = null;ran = false;// 处理异常setException(ex);}if (ran)// 处理结果set(result);}} finally {// 置空runnerrunner = null;// 处理中断int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}

可以看到代码也比较简单,先做状态的检测,再执行任务,最后处理结果或异常。

执行任务这里没啥问题,让我们看看处理结果或异常的代码。

protected void setException(Throwable t) {// 将状态从NEW置为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 返回值置为传进来的异常(outcome为调用get()方法时返回的)outcome = t;// 最终的状态设置为EXCEPTIONALUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state// 调用完成方法finishCompletion();}
}
protected void set(V v) {// 将状态从NEW置为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 返回值置为传进来的结果(outcome为调用get()方法时返回的)outcome = v;// 最终的状态设置为NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state// 调用完成方法finishCompletion();}
}

咋一看,这两个方法似乎差不多,不同的是出去的结果不一样且状态不一样,最后都调用了finishCompletion()方法。

private void finishCompletion() {// 如果队列不为空(这个队列实际上为调用者线程)for (WaitNode q; (q = waiters) != null;) {// 置空队列if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {// 调用者线程Thread t = q.thread;if (t != null) {q.thread = null;// 如果调用者线程不为空,则唤醒它LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}// 钩子方法,子类重写done();// 置空任务callable = null;        // to reduce footprint
}

整个run()方法总结下来:

(1)FutureTask有一个状态state控制任务的运行过程,正常运行结束state从NEW->COMPLETING->NORMAL,异常运行结束state从NEW->COMPLETING->EXCEPTIONAL;

(2)FutureTask保存了运行任务的线程runner,它是线程池中的某个线程;

(3)调用者线程是保存在waiters队列中的,它是什么时候设置进去的呢?

(4)任务执行完毕,除了设置状态state变化之外,还要唤醒调用者线程。

调用者线程是什么时候保存在FutureTask中(waiters)的呢?查看构造方法:

public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable
}

发现并没有相关信息,我们再试想一下,如果调用者不调用get()方法,那么这种未来任务是不是跟普通任务没有什么区别?确实是的哈,所以只有调用get()方法了才有必要保存调用者线程到FutureTask中。

所以,我们来看看get()方法中是什么鬼。

FutureTask类的get()方法

get()方法调用时如果任务未执行完毕,会阻塞直到任务结束。

public V get() throws InterruptedException, ExecutionException {int s = state;// 如果状态小于等于COMPLETING,则进入队列等待if (s <= COMPLETING)s = awaitDone(false, 0L);// 返回结果(异常)return report(s);
}

是不是很清楚,如果任务状态小于等于COMPLETING,则进入队列等待。

private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 我们这里假设不带超时final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {// 处理中断if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 4. 如果状态大于COMPLETING了,则跳出循环并返回// 这是自旋的出口int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}// 如果状态等于COMPLETING,说明任务快完成了,就差设置状态到NORMAL或EXCEPTIONAL和设置结果了// 这时候就让出CPU,优先完成任务else if (s == COMPLETING) // cannot time out yetThread.yield();// 1. 如果队列为空else if (q == null)// 初始化队列(WaitNode中记录了调用者线程)q = new WaitNode();// 2. 未进入队列else if (!queued)// 尝试入队queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 超时处理else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}// 3. 阻塞当前线程(调用者线程)elseLockSupport.park(this);}
}

这里我们假设调用get()时任务还未执行,也就是其状态为NEW,我们试着按上面标示的1、2、3、4走一遍逻辑:

(1)第一次循环,状态为NEW,直接到1处,初始化队列并把调用者线程封装在WaitNode中;

(2)第二次循环,状态为NEW,队列不为空,到2处,让包含调用者线程的WaitNode入队;

(3)第三次循环,状态为NEW,队列不为空,且已入队,到3处,阻塞调用者线程;

(4)假设过了一会任务执行完毕了,根据run()方法的分析最后会unpark调用者线程,也就是3处会被唤醒;

(5)第四次循环,状态肯定大于COMPLETING了,退出循环并返回;

问题:为什么要在for循环中控制整个流程呢,把这里的每一步单独拿出来写行不行?

答:因为每一次动作都需要重新检查状态state有没有变化,如果拿出去写也是可以的,只是代码会非常冗长。这里只分析了get()时状态为NEW,其它的状态也可以自行验证,都是可以保证正确的,甚至两个线程交叉运行(断点的技巧)。

OK,这里返回之后,再看看是怎么处理最终的结果的。

private V report(int s) throws ExecutionException {Object x = outcome;// 任务正常结束if (s == NORMAL)return (V)x;// 被取消了if (s >= CANCELLED)throw new CancellationException();// 执行异常throw new ExecutionException((Throwable)x);
}

还记得前面分析run的时候吗,任务执行异常时是把异常放在outcome里面的,这里就用到了。

(1)如果正常执行结束,则返回任务的返回值;

(2)如果异常结束,则包装成ExecutionException异常抛出;

通过这种方式,线程中出现的异常也可以返回给调用者线程了,不会像执行普通任务那样调用者是不知道任务执行到底有没有成功的。

其它

FutureTask除了可以获取任务的返回值以外,还能够取消任务的执行。

public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;
}

这里取消任务是通过中断执行线程来处理的,有兴趣的同学可以自己分析一下。

回答开篇

如果这里把future.get()放到for循环里面,时间大概是多少?

答:大概会是5秒多一点,因为每提交一个任务,都要阻塞调用者线程直到任务执行完毕,每个任务执行都是1秒多,所以总时间就是5秒多点。

总结

(1)未来任务是通过把普通任务包装成FutureTask来实现的。

(2)通过FutureTask不仅能够获取任务执行的结果,还有感知到任务执行的异常,甚至还可以取消任务;

(3)AbstractExecutorService中定义了很多模板方法,这是一种很重要的设计模式;

(4)FutureTask其实就是典型的异常调用的实现方式,后面我们学习到Netty、Dubbo的时候还会见到这种设计思想的。

彩蛋

RPC框架中异步调用是怎么实现的?

答:RPC框架常用的调用方式有同步调用、异步调用,其实它们本质上都是异步调用,它们就是用FutureTask的方式来实现的。

一般地,通过一个线程(我们叫作远程线程)去调用远程接口,如果是同步调用,则直接让调用者线程阻塞着等待远程线程调用的结果,待结果返回了再返回;如果是异步调用,则先返回一个未来可以获取到远程结果的东西FutureXxx,当然,如果这个FutureXxx在远程结果返回之前调用了get()方法一样会阻塞着调用者线程。

有兴趣的同学可以先去预习一下dubbo的异步调用(它是把Future扔到RpcContext中的)。


原文链接:https://www.cnblogs.com/tong-yuan/p/11795216.html

这篇关于深读源码-java线程系列之线程池深入解析——未来任务执行流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/362645

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06