Future、CompletionService、CompletableFuture

2023-12-23 11:12

本文主要是介绍Future、CompletionService、CompletableFuture,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

线程详解

  • 1.创建线程的3种方法
  • 2.线程实现原理
  • 3.Future
    • 3.1 Future的使用
    • 3.2 Future的局限性
  • 4.CompletionService使用
    • 4.1 CompletionService相较于Future的优点
    • 4.2 CompletionService实现原理
  • 5.CompletableFuture
    • 5.1 异步操作的方法
    • 5.2 等待获取结果
    • 5.3 常用方法
      • 5.3.1 结果处理

1.创建线程的3种方法

1.继承Thread类

class MyThread extends Thread {@Overridepublic void run() {// 线程执行的代码}
}// 创建并启动线程
MyThread myThread = new MyThread();
myThread.start();

继承Thread类, 并重写run方法, 然后创建该类的实例并调用start方法启动线程

2.实现Runnable接口

class MyRunnable implements Runnable {@Overridepublic void run() {// 线程执行的代码}
}// 创建并启动线程
Thread myThread = new Thread(new MyRunnable());
myThread.start();

实现Runnable接口, 并重写run方法, 创建Thread实例并传递Runnable实例,最后调用start方法

3.实现Callable接口, 并实现

class MyCallable implements Callable<Integer>{@Overridepublic Integer call() throws Exception {return 1;}
}// 创建线程池, 并启动
MyCallable myCallable= new MyCallable();
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<Integer> future = executorService.submit(myCallable);

Lambda表达式
因为Runnable和Callable都是函数式接口, 所以都可以使用Lambda表达式进行简化

Runnable runable = () -> {// 无返回值
};Callable callable = () -> {// 有返回值return 1;
};

总结:
上面三种创建线程的方式, 其中继承Thread类或者实现Runnable接口都可以创建线程, 但是它两有一个共同的问题: 没有返回值, 没有返回值, 就无法获取线程执行完的结果

JDK1.5新增了一个Callable接口来解决上面的问题, 但是Callable只能在线程池中提交任务使用

2.线程实现原理

继承Thread类和实现Runnable接口两种方式本身就是一种方式,通过创建Thread实例,然后调用start()方法来创建实例

Callable接口的方式实质上也是通过Thread类来实现的, 我们可以看一下ExecutorService的submit()方法

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}

在这个方法中, 首先将Callable实例封装成一个FutureTask实例, FutureTask实现了RunnableFuture接口,而RunnableFuture又实现了Runnable接口,也就是说封装后的FutureTask仍然只是一个任务实例,此时与线程并没有任何关系,真正建立关系是在execute()方法中

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}……
}

其中的addWorker方法, 该方法是创建一个线程

private boolean addWorker(Runnable firstTask, boolean core) {……w = new Worker(firstTask);final Thread t = w.thread;……
}

其中初始化Worker的方式如下

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;
}

从上面对Callable的分析,我们可以得出结论,所有创建线程的方式都可以归结为一种方式,那就是创建Thread实例

3.Future

3.1 Future的使用

Future主要是配合Callable使用

Callable的call方法可以有返回值,可以声明抛出异常。和Callable配合的有一个Future类,通过Future可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable做不到的,Callable的功能要比Runnable强大。

Future<String> future =  Executors.newSingleThreadExecutor().submit(() -> {return "Task completed";
});

1.取消任务
用于取消任务的执行。 参数true表示中断执行任务的线程

boolean canceled = future.cancel(true);

2.检查任务是否完成
用于检查任务是否已经完成。如果任务已经完成,返回true;否则返回false。

if (future.isDone()) {// 任务已完成
} else {// 任务未完成
}

3.处理异常
get()方法用于获取异步任务的结果。如果任务已经完成,它会立即返回结果;否则,它会阻塞直到任务完成。

如果在执行过程中, 线程抛出异常, 使用 try-catch 块来处理异步任务中的异常。

try {String result = future.get();System.out.println(result);
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
}

3.2 Future的局限性

  • 阻塞等待: Future.get()方法是阻塞的,如果任务还没有完成,调用get() 会一直等待。这可能导致程序的响应性变差,特别是在需要等待很长时间的情况下。
  • 取消困难: Future 接口提供了cancel() 方法来取消任务,但这个方法的实现是可选的,而且并不是所有的Future实现都支持取消。如果任务已经开始执行或已经完成,那么取消可能会很困难。
  • 单一结果: Future只能表示单一的异步结果。如果需要处理多个并发任务的结果,可能需要使用更复杂的数据结构,比如CompletionService
  • 缺乏通知机制: Future缺乏内置的通知机制,不能直接注册回调函数,因此在任务完成时无法直接执行某些操作。这使得编写异步代码相对复杂。
  • 异常处理不直观: 异步任务中的异常处理比同步代码更加复杂。在Future 中,如果任务抛出异常,异常会被包装在ExecutionException中,需要在客户端代码中进行处理。
  • 不适合流式处理: Future接口本身并不提供对任务结果的流式处理能力。在需要对异步任务进行流式处理的情况下,可能需要结合其他的编程模型或使用更高级别的抽象,比如CompletableFuture。

4.CompletionService使用

Callable+Future虽然可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。

而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

案例: Future方式

public static void main(String[] args) {//    创建线程池ExecutorService executor = Executors.newFixedThreadPool(2);//    异步向电商S1询价Future<Integer> f1 = executor.submit(() -> getPriceByS1());//    异步向电商S2询价Future<Integer> f2 = executor.submit(() -> getPriceByS2());//    获取电商S1报价并异步保存executor.execute(() -> save(f1.get()));//    获取电商S2报价并异步保存executor.execute(() -> save(f2.get()));
}

在这个案例中, 如果获取S1报价的耗时很长, 那么即使获取S2报价的耗时很短, 也无法让保存S2报价的操作先执行, 因为此时主线程会阻塞在f1.get()操作上

案例: CompletionService方式

public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executor = Executors.newFixedThreadPool(10);//创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);//异步向电商S1询价cs.submit(() -> {Thread.sleep(2000);return  10;});//异步向电商S2询价cs.submit(() -> {return  20;});//将询价结果异步保存到数据库for (int i = 0; i < 2; i++) {Integer r = cs.take().get();System.out.println(r);}
}

CompletionService更适合处理一组任务, 可以将所有的任务提交到 CompletionService 中,然后按照它们完成的顺序逐个处理结果。

调用take()方法会阻塞等待下一个已完成的任务,这意味着你可以立即处理完成的任务,而不必等待所有任务都完成。

4.1 CompletionService相较于Future的优点

  • 按顺序获取任务完成的结果:
    CompletionService提供了take()和poll()方法,可以按照任务完成的顺序获取结果, 这样你就可以立即处理已经完成任务的结果. 而不用等所有任务都完成后才开始处理
  • 避免阻塞等待:
    CompletionService的take()方法会阻塞等待下一个已完成的任务,这意味着你可以立即处理完成的任务,而不必等待所有任务都完成。相比之下,Future.get() 方法是阻塞的,必须等待特定的Future对象的任务完成。
  • 方便处理一组任务:
    CompletionService更适合处理一组任务,你可以将所有的任务提交到CompletionService中,然后按照它们完成的顺序逐个处理结果。
  • 简化异常处理:
    在 CompletionService中,每个任务的结果都包装在Future对象中,如果任务抛出异常,你可以通过Future的get方法捕获ExecutionException,从而更容易地处理异常。在Future中,一个任务的异常可能会影响其他任务的执行,因为它们共享相同的 ExecutorService。
  • 更灵活的任务提交:
    CompletionService的submit方法接受Runnable或Callable,并返回包装结果的Future对象。这使得任务的提交更加灵活,你可以根据需要选择是否关心任务的结果。
  • 支持多个ExecutorService:
    CompletionService允许你使用不同的ExecutorService实例,这对于将不同类型的任务分配给不同的线程池是很有用的

4.2 CompletionService实现原理

CompletionService内部通过阻塞队列 + FutureTask, 阻塞队列的作用是为了存储那些已经执行完成的任务的Future对象, 它是按照完成先后顺序排序

5.CompletableFuture

CompletableFuture
CompletableFuture是对Future的扩展和增强, 它实现了Future接口, 并在此基础上进行了丰富的扩展,完美弥补了Future的局限性

5.1 异步操作的方法

CompletableFuture提供了4个静态方法来创建异步操作

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync和supplyAsync的区别

runAsyncsupplyAsync
参数接受一个Runnable参数,表达异步执行的任务, 没有返回值接受一个Supplier<U>参数, 表达异步执行的任务, 且有一个返回值
返回值类型CompletableFuture<Void>CompletableFuture<U>,其中 U 是通过 Supplier<U> 提供的结果类型
结果处理任务执行完毕后,不返回任何结果,因此runAsync不会有返回值。任务执行完毕后,会返回一个结果。
用途适用于那些不需要返回结果的异步任务,比如异步地执行一些操作但不需要返回任何值。适用于那些需要返回结果的异步任务,比如异步地获取某个值。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {// 异步执行的任务System.out.println("Task running asynchronously");
});CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 异步执行的任务,并返回结果return "Result of the asynchronous computation";
});

Executor参数的作用
使用没有指定Executor的方法时,内部使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

ForkJoinPool.commonPool(), 这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)

如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

5.2 等待获取结果

join()get()方法都是用来获取CompletableFuture异步之后的返回值

  • join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。
  • get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
join()get()
抛出异常join()方法不会抛出受检查异常,因此在使用时不需要处理异常,这使得代码更加清晰简洁。get()方法会抛出 InterruptedException 和 ExecutionException 异常,因此需要进行异常处理。在实际应用中,通常需要处理这些异常,以确保对异步任务的正确处理。

总的来说,如果你不需要处理异常,并且可以确保异步任务不会抛出异常,那么使用 join() 方法可能更为方便。如果需要处理异常或者希望更细粒度地控制异常的处理,可以选择使用 get() 方法。

5.3 常用方法

依赖关系

  • thenApply():把前面任务的执行结果,交给后面的Function
  • thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回

and集合关系

  • thenCombine():合并任务,有返回值
  • thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值
  • runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务)

or聚合关系

  • applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值
  • acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值
  • runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)

并行执行

  • allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
  • anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture

结果处理

  • whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作
  • exceptionally:返回一个新的CompletableFuture,当前面的CompletableFuture完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture的完成

5.3.1 结果处理

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

whenComplete用于处理异步结果异常, 它允许你注册一个回调函数, 这个函数会在异步计算完成后(无论是正常完成还是出现异常)后执行

whenComplete方法不阻塞线程,它是异步执行的。

action参数是一个BiConsumer,接受两个参数:计算的结果(如果成功完成)和异常(如果异步任务抛出了异常)。

import java.util.concurrent.CompletableFuture;public class WhenCompleteExample {public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// Simulate some computationreturn "Hello, CompletableFuture!";});// Register a callback with whenCompletefuture.whenComplete((result, exception) -> {if (exception == null) {System.out.println("Result: " + result);} else {System.out.println("Exception: " + exception.getMessage());}});// Main thread continues with other workSystem.out.println("Main thread continues with other work...");// Ensure the program doesn't exit before the CompletableFuture completestry {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}
}

在这个例子中,whenComplete 方法注册了一个回调函数,该函数会在异步计算完成时被调用。回调函数根据计算结果是否异常分别进行处理,输出相应的信息。

需要注意的是,whenComplete 方法不会阻塞主线程,因此在主线程继续执行其他工作之前,我们通过 Thread.sleep 确保异步任务有足够的时间完成。在实际应用中,通常会使用更复杂的控制流或者 CompletableFuture.join() 方法等方式来等待异步任务的完成。

whenComplete和whenCompleteAsync的区别

public static void main(String[] args) {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("Async Thread: " + Thread.currentThread().getName());return "Hello, CompletableFuture!";});// 使用 whenCompleteCompletableFuture whenCompleteFuture = future.whenComplete((result, exception) -> {System.out.println("whenComplete Thread: " + Thread.currentThread().getName());});// 使用 whenCompleteAsyncCompletableFuture whenCompleteAsyncFuture = future.whenCompleteAsync((result, exception) -> {System.out.println("whenCompleteAsync Thread: " + Thread.currentThread().getName());});// Ensure the program doesn't exit before the CompletableFutures completeCompletableFuture.allOf(whenCompleteFuture, whenCompleteAsyncFuture).join();
}
  1. whenComplete的回调函数在异步任务的执行线程上执行,因此在输出中你会看到Async ThreadwhenComplete Thread的线程名称是一致的。
  2. whenCompleteAsync的回调函数会在默认的ForkJoinPool中的某个线程上执行,因此whenCompleteAsync Thread的线程名称可能不同于Async Thread。
  3. CompletableFuture.allOf(…).join()用于等待所有的CompletableFutures完成,以保证程序不会在异步任务未完成时退出。

这篇关于Future、CompletionService、CompletableFuture的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/527827

相关文章

CompletableFuture类总结)

CompletableFuture类总结 ```javapublic class CompletableFuture<T> implements Future<T>, CompletionStage<T> {// 1、创建一个异步操作:无返回值public static CompletableFuture<Void> runAsync(Runnable runnable);pu

Java并发编程:Callable和Future使用

在Java中,创建线程一般有两种方式,一种是继承Thread类,一种是实现Runnable接口。然而,这两种方式的缺点是在线程任务执行结束后,无法获取执行结果。我们一般只能采用共享变量或共享存储区以及线程通信的方式实现获得任务结果的目的。 不过,Java中,也提供了使用Callable和Future来实现获取任务结果的操作。Callable用来执行任务,产生结果,而Future用来获得结果。 C

异步任务组合神器CompletableFuture

使用Demo import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class CompletableFutureDemo {private static AtomicInteger count = new AtomicInteger(2);public static void

std::future和std::promise详解(原理、应用、源码)

在编程实践中,我们常常需要使用异步调用。通过异步调用,我们可以将一些耗时、阻塞的任务交给其他线程来执行,从而保证当前线程的快速响应能力。还有一些场景可以通过将一个任务,分成多个部分然后将这部分交给多个线程来进行并发执行,从而完成任务的快速计算执行,提高应用性能。这里就需要用到异步调用的概念 异步调用,就是当前线程将一个任务交给另外一个线程来进行执行,当前线程会接着执行当前任务,不需要等待这个交付

Java 8 CompletableFuture: 异步编程的新选择

Java 8 CompletableFuture: 异步编程的新选择 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 一、CompletableFuture简介 Java 8引入了CompletableFuture,这是对异步编程的全新支持。它提供了一种编写异步应用程序的方法,允许开发者以声明性的方式处理异步逻辑。 二、创建CompletableFutu

Future 原理模拟

大家已经知道Future可以异步返回结果,但是其中的原理 却并不是所有人都懂  今天偶然看见了一段模拟原理的代码 就记录一下 首先大致介绍下其中的原理 : 在客户端请求的时候,直接返回客户端需要的数据(此数据不一定完整,只是简单的一点不耗时的操作),但是客户端并不一定马上使用所有的信息,此时就有了时间去完善客户需要的信息  大致原理如此 说到底就是一个简单的异步操作,下面贴上源码 并做简单的介绍

Java 通过Future来对任务进行取消

本节我们将通过Java中的Future实现对于提交的任务进行取消。ExecutorService.submit将返回一个Future来描述任务,Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning,表示取消操作是否成功(这个参数只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断)。如果mayInterrup

论文笔记:Estimating future human trajectories from sparse time series data

sigspatial 2023 humob竞赛paper hiimryo816/humob2023-MOBB (github.com) 1 数据集分析 这里只分享了HuMob数据集1的内容 1.1 假日分析 对HuMob数据集#1地理数据的方差分析显示了非工作日的模式 在某些天的y坐标方差中有显著的峰值,这是非工作日的象征【x坐标有相似的模式】 ——>识别了任务1数据集中最有可能是

Java源码分析----Future

一般使用多线程操作的时候会使用Thread+Runnable进行处理,但是这种方式中,Runnable是没有返回值的,假设我们需要获取Runnable的返回值,可能需要如下特殊处理,伪代码如下 String returnValue1 = "";String returnValue2 = "";CountDownLatch cdl = ....new Thread(()->{// xxxx操

java多线程-使用Future获得多线程的返回值

我们在多线程中很多时候会需要获得线程的返回值,但是多线程又不像普通方法一样,可以直接获得返回值,那么我们应该怎么获得返回值呢?   一、使用Future获得Callable的返回值 import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.conc