小议CompletableFuture 链式执行和响应式编程

2024-01-12 07:12

本文主要是介绍小议CompletableFuture 链式执行和响应式编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相关文章:

  • 用最简单的方式理解同步和异步、阻塞与非阻塞
  • CompletableFuture 实战

背景

昨晚和一个朋友讨论了他在开发过程中遇到的一个场景设计问题。这个场景可以简化为:服务接收到一个需要处理的任务请求,然后立即返回。这个任务需要经过四个处理器的处理,每个处理器的处理都依赖于前一个处理器的处理结果。

这个场景与我最近处理 GitLab WebHook 的工作流程非常相似。例如,我从 GitLab 接收到一个事件后,需要对数据进行清洗、解析、调用 AI 大模型和发布评论等操作,每个操作都依赖于前一个操作的结果。

对于这种场景的设计,我目前采用的是基于链式设计的方法。我定义了一个抽象的处理器类和链式工厂:

/*** @author dongguabai* @date 2024-01-09 13:52*/
public abstract class MergeRequestHandler {private MergeRequestHandler next;public MergeRequestHandler(MergeRequestHandler next) {this.next = next;}public void handle(Task task) {if (run(task) && next != null) {next.handle(task);}}public abstract boolean run(Task task);
}
/*** @author dongguabai* @date 2024-01-09 15:53*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MergeRequestHandlerFactory {public static MergeRequestHandler createHandlerChain() {return new ActionHandler(new MergeRequestRetrievalHandler(new CommentHandler(new FinalHandler())));}
}

在链式工厂里面指定了执行顺序。

当然,也可以参考 ZooKeeper 的实现:org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors

@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false);commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);proposalProcessor.initialize();firstProcessor = new PrepRequestProcessor(this, proposalProcessor);((PrepRequestProcessor)firstProcessor).start();}

然后,朋友提出要将其改为异步处理。实际上,异步处理也是可以很简单实现的,只需在执行 HandlerChain 时将其放入线程池中即可。但在这里,我完全可以使用 CompletableFuture 来实现,这也是我当前代码改造的主要方向。

关于CompletableFuture的实现,我设计了两套方案。现在来看一下最终实现的方案。

抽象 Processor

@Log4j
abstract class Processor {public CompletableFuture<Task> process(Task task, Executor executor) {return CompletableFuture.supplyAsync(() -> doProcess(task), executor);}protected abstract Task doProcess(Task task);
}

实现类:

@Log4j
class Processor1 extends Processor {@Overrideprotected Task doProcess(Task task) {// 你的处理逻辑log.info("Processor1 is processing");ProcessorChain.sleep(1);log.info("Processor1 end");if (task.isCancelled()) {throw new CancellationException("Task was cancelled");}return task;}
}
@Log4j
class Processor2 extends Processor {@Overrideprotected Task doProcess(Task task) {// 你的处理逻辑log.info("Processor2 is processing");ProcessorChain.sleep(2);log.info("Processor2 end");if (task.isCancelled()) {throw new CancellationException("Task was cancelled");}return task;}
}

核心的处理链:

@Log4j
class ProcessorChain {private final List<Processor> processors;public ProcessorChain(List<Processor> processors) {this.processors = processors;}public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result ->log.info("处理完成,结果是:" + result));}public static void sleep(Integer i){try {TimeUnit.SECONDS.sleep(i);} catch (InterruptedException e) {e.printStackTrace();}}
}

测试代码:

    public static void main(String[] args) {Executor executor = Executors.newCachedThreadPool();ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));Task task = new Task("1");CompletableFuture<Void> exceptionally = processorChain.process(task, executor).exceptionally(ex -> {System.out.println("处理过程中出现错误:" + ex.getMessage());return null;});log.info("处理完成");}

输出:

22:38:35.276 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:38:35.276 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:38:36.285 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:38:36.285 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 is processing
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 end
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 is processing
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 end
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 is processing
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 end
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@6dfb9646

效果挺符合诉求的,但这里也有几个疑问。

ProcessorChain#process中的执行顺序问题

先看 ProcessorChain#proccess

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));}

朋友原话是:“他期望的执行顺序是 process1process2process3。但由于 thenComposeAsync是异步执行的,那么在循环到 process2 时,它会异步执行。此时,循环可能已经到了 process3,并开始执行 thenComposeAsync。这样,process3process2可能会并行执行,这跟期望结果不一致”。

其实这里是不会的,thenComposeAsync 可以翻译为 “然后异步组合”。

这里的 “组合” 指的是将当前的 CompletableFuture 与另一个 CompletableFuture 进行组合。“异步” 指的是这里的 Function 操作会在一个单独的线程中执行,不会阻塞当前的线程。

我这里的 Processor#process 函数返回的是 CompletableFuture,所以这里 thenComposeAsync 的流程是:等待当前的 CompletableFuture 完成后,异步执行 FunctionFunction 会返回一个新的 CompletableFuture,然后 thenComposeAsync 会返回这个 CompletableFuture

这里要注意 thenApplyAsyncthenComposeAsync 不要混淆了。

ProcessorChain#process中的阻塞与非阻塞、同步和异步问题

上文已经说明了 ProcessorChain#process 中的执行顺序问题。接下来看一下这段代码中的阻塞与非阻塞、同步和异步问题。

为了查看方便,我这里再贴一下测试代码和 ProcessorChain#proccess 的实现。

测试代码:

    public static void main(String[] args) {Executor executor = Executors.newSingleThreadExecutor();ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));Task task = new Task("1");CompletableFuture<Void> c = processorChain.process(task, executor).exceptionally(ex -> {System.out.println("处理过程中出现错误:" + ex.getMessage());return null;});log.info("处理完成");}

ProcessorChain#proccess

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));}

执行流程如下:

  1. main 线程开始执行,并调用 ProcessorChainprocess 方法。
  2. ProcessorChainprocess 方法中,main 线程创建了一个已经完成的 CompletableFuture,并开始遍历 Processor 对象。
  3. main 线程调用 thenComposeAsync 方法,此时 main 线程将 Processor1process 方法提交给 Executor,并立即返回一个新的 CompletableFuturemain 线程不会等待 Processor1process 方法完成,因此 main 线程是非阻塞的。
  4. Executor 的一个线程(称之为 Thread_1)开始执行 Processor1process 方法。由于 thenComposeAsync 的链式调用,Processor2process 方法会等待 Processor1process 方法完成后才开始执行,以此类推。
  5. main 线程继续遍历 Processor 对象,并对每个 Processor 重复步骤 3 和 4。每次调用 thenComposeAsync 方法时,main 线程都会立即返回一个新的 CompletableFuture,并将 Processorprocess 方法提交给 Executor。这意味着 main 线程是非阻塞的,而 Executor 的线程会按照 Processor 的顺序执行 process 方法。
  6. 当所有的 Processorprocess 方法都完成后,其中一个 Executor 的线程会执行 thenAccept 方法,处理最后一个 Processorprocess 方法返回的结果。

其实这里的 CompletableFuture<Task> future = CompletableFuture.completedFuture(task); 只是作为一个“CompletableFuture 的启动器”(这么一说是不是就更好理解了)。

响应式编程

其实上面的内容让我想起了响应式编程,引入维基百科的一个说明:

例如,在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化 。

其实这里的 CompletableFuture 实际上是响应式编程的一个简单例子,因为它表示一个异步计算的结果,我们可以在它上面“提前”注册回调函数(我觉得这个“提前”是精髓),这些回调函数会在 CompletableFutureFunction 完成时被调用。而我这里就是提前准备好了一个结果,即 CompletableFuture,然后不断的循环在其上面增加回调。

可以增加日志看一下:

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {log.info(future);future = future.thenComposeAsync(t -> processor.process(t, executor), executor);log.info(future);// future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result ->log.info("处理完成,结果是:" + result));}

再次执行测试代码:

22:56:42.512 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@3fa77460[Completed normally]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5fdef03a[Not completed]
22:56:42.614 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:56:42.615 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 is processing
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 end
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 is processing
22:56:48.630 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 end
22:56:48.631 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 is processing
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 end
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@58d4f75a

重新梳理执行流程:

  1. 最开始的启动器是 3fa77460,这是一个已经完成的 CompletableFuture 对象,用于启动异步操作链。

  2. 第一次循环:3fa77460 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 e2d56bf。此时,Processor1process 方法正在异步执行。

  3. 第二次循环:e2d56bf 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 244038d0。此时,Processor2process 方法正在等待 Processor1process 方法完成。

  4. 第三次循环:244038d0 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 5680a178。此时,Processor3process 方法正在等待 Processor2process 方法完成。

  5. 第四次循环:5680a178 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 5fdef03a。此时,Processor4process 方法正在等待 Processor3process 方法完成。

  6. 所有循环结束后,返回 5fdef03a。当所有的 Processorprocess 方法都完成后,5fdef03a 会执行 thenAccept 方法,处理最后一个 Processorprocess 方法返回的结果。

这个过程中,每个 CompletableFuture 对象都会等待前一个 CompletableFuture 对象完成,然后开始执行自己的异步操作。这就形成了一个 CompletableFuture 链,每个 CompletableFuture 对象都会按照 Processor 的顺序执行 process 方法。

CompletableFuture链

当调用thenComposeAsync方法时,会创建一个新的CompletableFuture对象:

    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);}
    private <V> CompletableFuture<V> uniComposeStage(Executor e, Function<? super T, ? extends CompletionStage<V>> f) {if (f == null) throw new NullPointerException();Object r; Throwable x;if (e == null && (r = result) != null) {// try to return function result directlyif (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {return new CompletableFuture<V>(encodeThrowable(x, r));}r = null;}try {@SuppressWarnings("unchecked") T t = (T) r;CompletableFuture<V> g = f.apply(t).toCompletableFuture();Object s = g.result;if (s != null)return new CompletableFuture<V>(encodeRelay(s));CompletableFuture<V> d = new CompletableFuture<V>();UniRelay<V> copy = new UniRelay<V>(d, g);g.push(copy);copy.tryFire(SYNC);return d;} catch (Throwable ex) {return new CompletableFuture<V>(encodeThrowable(ex));}}CompletableFuture<V> d = new CompletableFuture<V>();//构建 UniComposeUniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);push(c);c.tryFire(SYNC);return d;}

构造的 UniCompose对象持有前一个CompletableFuture对象的引用,也就是 UniComposesrc 字段。所以即使循环中出现 CompletableFuture 被覆盖,也不用担心被 GC 的问题。

总结

本文以一个实际的场景设计问题为出发点,详细探讨了 CompletableFuture 的链式执行机制,对执行流程和线程切换进行了深入的分析,并对响应式编程的概念和应用进行了简单的讨论。

References

  • https://zh.wikipedia.org/zh-cn/%E5%93%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B

欢迎关注公众号:
在这里插入图片描述

这篇关于小议CompletableFuture 链式执行和响应式编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/597157

相关文章

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

在MySQL执行UPDATE语句时遇到的错误1175的解决方案

《在MySQL执行UPDATE语句时遇到的错误1175的解决方案》MySQL安全更新模式(SafeUpdateMode)限制了UPDATE和DELETE操作,要求使用WHERE子句时必须基于主键或索引... mysql 中遇到的 Error Code: 1175 是由于启用了 安全更新模式(Safe Upd

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

PyCharm接入DeepSeek实现AI编程的操作流程

《PyCharm接入DeepSeek实现AI编程的操作流程》DeepSeek是一家专注于人工智能技术研发的公司,致力于开发高性能、低成本的AI模型,接下来,我们把DeepSeek接入到PyCharm中... 目录引言效果演示创建API key在PyCharm中下载Continue插件配置Continue引言

Java CompletableFuture如何实现超时功能

《JavaCompletableFuture如何实现超时功能》:本文主要介绍实现超时功能的基本思路以及CompletableFuture(之后简称CF)是如何通过代码实现超时功能的,需要的... 目录基本思路CompletableFuture 的实现1. 基本实现流程2. 静态条件分析3. 内存泄露 bug

详解如何在React中执行条件渲染

《详解如何在React中执行条件渲染》在现代Web开发中,React作为一种流行的JavaScript库,为开发者提供了一种高效构建用户界面的方式,条件渲染是React中的一个关键概念,本文将深入探讨... 目录引言什么是条件渲染?基础示例使用逻辑与运算符(&&)使用条件语句列表中的条件渲染总结引言在现代

Spring MVC如何设置响应

《SpringMVC如何设置响应》本文介绍了如何在Spring框架中设置响应,并通过不同的注解返回静态页面、HTML片段和JSON数据,此外,还讲解了如何设置响应的状态码和Header... 目录1. 返回静态页面1.1 Spring 默认扫描路径1.2 @RestController2. 返回 html2

使用Python实现批量访问URL并解析XML响应功能

《使用Python实现批量访问URL并解析XML响应功能》在现代Web开发和数据抓取中,批量访问URL并解析响应内容是一个常见的需求,本文将详细介绍如何使用Python实现批量访问URL并解析XML响... 目录引言1. 背景与需求2. 工具方法实现2.1 单URL访问与解析代码实现代码说明2.2 示例调用