小议CompletableFuture 链式执行和响应式编程

2024-01-12 07:12

本文主要是介绍小议CompletableFuture 链式执行和响应式编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相关文章:

  • 用最简单的方式理解同步和异步、阻塞与非阻塞
  • CompletableFuture 实战

背景

昨晚和一个朋友讨论了他在开发过程中遇到的一个场景设计问题。这个场景可以简化为:服务接收到一个需要处理的任务请求,然后立即返回。这个任务需要经过四个处理器的处理,每个处理器的处理都依赖于前一个处理器的处理结果。

这个场景与我最近处理 GitLab WebHook 的工作流程非常相似。例如,我从 GitLab 接收到一个事件后,需要对数据进行清洗、解析、调用 AI 大模型和发布评论等操作,每个操作都依赖于前一个操作的结果。

对于这种场景的设计,我目前采用的是基于链式设计的方法。我定义了一个抽象的处理器类和链式工厂:

/*** @author dongguabai* @date 2024-01-09 13:52*/
public abstract class MergeRequestHandler {private MergeRequestHandler next;public MergeRequestHandler(MergeRequestHandler next) {this.next = next;}public void handle(Task task) {if (run(task) && next != null) {next.handle(task);}}public abstract boolean run(Task task);
}
/*** @author dongguabai* @date 2024-01-09 15:53*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class MergeRequestHandlerFactory {public static MergeRequestHandler createHandlerChain() {return new ActionHandler(new MergeRequestRetrievalHandler(new CommentHandler(new FinalHandler())));}
}

在链式工厂里面指定了执行顺序。

当然,也可以参考 ZooKeeper 的实现:org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors

@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false);commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);proposalProcessor.initialize();firstProcessor = new PrepRequestProcessor(this, proposalProcessor);((PrepRequestProcessor)firstProcessor).start();}

然后,朋友提出要将其改为异步处理。实际上,异步处理也是可以很简单实现的,只需在执行 HandlerChain 时将其放入线程池中即可。但在这里,我完全可以使用 CompletableFuture 来实现,这也是我当前代码改造的主要方向。

关于CompletableFuture的实现,我设计了两套方案。现在来看一下最终实现的方案。

抽象 Processor

@Log4j
abstract class Processor {public CompletableFuture<Task> process(Task task, Executor executor) {return CompletableFuture.supplyAsync(() -> doProcess(task), executor);}protected abstract Task doProcess(Task task);
}

实现类:

@Log4j
class Processor1 extends Processor {@Overrideprotected Task doProcess(Task task) {// 你的处理逻辑log.info("Processor1 is processing");ProcessorChain.sleep(1);log.info("Processor1 end");if (task.isCancelled()) {throw new CancellationException("Task was cancelled");}return task;}
}
@Log4j
class Processor2 extends Processor {@Overrideprotected Task doProcess(Task task) {// 你的处理逻辑log.info("Processor2 is processing");ProcessorChain.sleep(2);log.info("Processor2 end");if (task.isCancelled()) {throw new CancellationException("Task was cancelled");}return task;}
}

核心的处理链:

@Log4j
class ProcessorChain {private final List<Processor> processors;public ProcessorChain(List<Processor> processors) {this.processors = processors;}public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result ->log.info("处理完成,结果是:" + result));}public static void sleep(Integer i){try {TimeUnit.SECONDS.sleep(i);} catch (InterruptedException e) {e.printStackTrace();}}
}

测试代码:

    public static void main(String[] args) {Executor executor = Executors.newCachedThreadPool();ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));Task task = new Task("1");CompletableFuture<Void> exceptionally = processorChain.process(task, executor).exceptionally(ex -> {System.out.println("处理过程中出现错误:" + ex.getMessage());return null;});log.info("处理完成");}

输出:

22:38:35.276 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:38:35.276 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:38:36.285 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:38:36.285 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 is processing
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor2 - Processor2 end
22:38:38.290 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 is processing
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor3 - Processor3 end
22:38:41.294 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 is processing
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.Processor4 - Processor4 end
22:38:45.299 [pool-1-thread-3] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@6dfb9646

效果挺符合诉求的,但这里也有几个疑问。

ProcessorChain#process中的执行顺序问题

先看 ProcessorChain#proccess

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));}

朋友原话是:“他期望的执行顺序是 process1process2process3。但由于 thenComposeAsync是异步执行的,那么在循环到 process2 时,它会异步执行。此时,循环可能已经到了 process3,并开始执行 thenComposeAsync。这样,process3process2可能会并行执行,这跟期望结果不一致”。

其实这里是不会的,thenComposeAsync 可以翻译为 “然后异步组合”。

这里的 “组合” 指的是将当前的 CompletableFuture 与另一个 CompletableFuture 进行组合。“异步” 指的是这里的 Function 操作会在一个单独的线程中执行,不会阻塞当前的线程。

我这里的 Processor#process 函数返回的是 CompletableFuture,所以这里 thenComposeAsync 的流程是:等待当前的 CompletableFuture 完成后,异步执行 FunctionFunction 会返回一个新的 CompletableFuture,然后 thenComposeAsync 会返回这个 CompletableFuture

这里要注意 thenApplyAsyncthenComposeAsync 不要混淆了。

ProcessorChain#process中的阻塞与非阻塞、同步和异步问题

上文已经说明了 ProcessorChain#process 中的执行顺序问题。接下来看一下这段代码中的阻塞与非阻塞、同步和异步问题。

为了查看方便,我这里再贴一下测试代码和 ProcessorChain#proccess 的实现。

测试代码:

    public static void main(String[] args) {Executor executor = Executors.newSingleThreadExecutor();ProcessorChain processorChain = new ProcessorChain(Arrays.asList(new Processor1(), new Processor2(), new Processor3(), new Processor4()));Task task = new Task("1");CompletableFuture<Void> c = processorChain.process(task, executor).exceptionally(ex -> {System.out.println("处理过程中出现错误:" + ex.getMessage());return null;});log.info("处理完成");}

ProcessorChain#proccess

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {future = future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result -> System.out.println("处理完成,结果是:" + result));}

执行流程如下:

  1. main 线程开始执行,并调用 ProcessorChainprocess 方法。
  2. ProcessorChainprocess 方法中,main 线程创建了一个已经完成的 CompletableFuture,并开始遍历 Processor 对象。
  3. main 线程调用 thenComposeAsync 方法,此时 main 线程将 Processor1process 方法提交给 Executor,并立即返回一个新的 CompletableFuturemain 线程不会等待 Processor1process 方法完成,因此 main 线程是非阻塞的。
  4. Executor 的一个线程(称之为 Thread_1)开始执行 Processor1process 方法。由于 thenComposeAsync 的链式调用,Processor2process 方法会等待 Processor1process 方法完成后才开始执行,以此类推。
  5. main 线程继续遍历 Processor 对象,并对每个 Processor 重复步骤 3 和 4。每次调用 thenComposeAsync 方法时,main 线程都会立即返回一个新的 CompletableFuture,并将 Processorprocess 方法提交给 Executor。这意味着 main 线程是非阻塞的,而 Executor 的线程会按照 Processor 的顺序执行 process 方法。
  6. 当所有的 Processorprocess 方法都完成后,其中一个 Executor 的线程会执行 thenAccept 方法,处理最后一个 Processorprocess 方法返回的结果。

其实这里的 CompletableFuture<Task> future = CompletableFuture.completedFuture(task); 只是作为一个“CompletableFuture 的启动器”(这么一说是不是就更好理解了)。

响应式编程

其实上面的内容让我想起了响应式编程,引入维基百科的一个说明:

例如,在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化 。

其实这里的 CompletableFuture 实际上是响应式编程的一个简单例子,因为它表示一个异步计算的结果,我们可以在它上面“提前”注册回调函数(我觉得这个“提前”是精髓),这些回调函数会在 CompletableFutureFunction 完成时被调用。而我这里就是提前准备好了一个结果,即 CompletableFuture,然后不断的循环在其上面增加回调。

可以增加日志看一下:

    public CompletableFuture<Void> process(Task task, Executor executor) {CompletableFuture<Task> future = CompletableFuture.completedFuture(task);for (Processor processor : processors) {log.info(future);future = future.thenComposeAsync(t -> processor.process(t, executor), executor);log.info(future);// future.thenComposeAsync(t -> processor.process(t, executor), executor);}return future.thenAccept(result ->log.info("处理完成,结果是:" + result));}

再次执行测试代码:

22:56:42.512 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@3fa77460[Completed normally]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.611 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@e2d56bf[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@244038d0[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5680a178[Not completed]
22:56:42.612 [main] INFO demo.sb1.ProcessorChain - java.util.concurrent.CompletableFuture@5fdef03a[Not completed]
22:56:42.614 [main] INFO demo.sb1.ProcessorChainTest - 处理完成
22:56:42.615 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 is processing
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor1 - Processor1 end
22:56:43.620 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 is processing
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor2 - Processor2 end
22:56:45.625 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 is processing
22:56:48.630 [pool-1-thread-2] INFO demo.sb1.Processor3 - Processor3 end
22:56:48.631 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 is processing
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.Processor4 - Processor4 end
22:56:52.635 [pool-1-thread-2] INFO demo.sb1.ProcessorChain - 处理完成,结果是:demo.sb1.Task@58d4f75a

重新梳理执行流程:

  1. 最开始的启动器是 3fa77460,这是一个已经完成的 CompletableFuture 对象,用于启动异步操作链。

  2. 第一次循环:3fa77460 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 e2d56bf。此时,Processor1process 方法正在异步执行。

  3. 第二次循环:e2d56bf 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 244038d0。此时,Processor2process 方法正在等待 Processor1process 方法完成。

  4. 第三次循环:244038d0 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 5680a178。此时,Processor3process 方法正在等待 Processor2process 方法完成。

  5. 第四次循环:5680a178 执行 thenComposeAsync 方法,返回新的 CompletableFuture 对象 5fdef03a。此时,Processor4process 方法正在等待 Processor3process 方法完成。

  6. 所有循环结束后,返回 5fdef03a。当所有的 Processorprocess 方法都完成后,5fdef03a 会执行 thenAccept 方法,处理最后一个 Processorprocess 方法返回的结果。

这个过程中,每个 CompletableFuture 对象都会等待前一个 CompletableFuture 对象完成,然后开始执行自己的异步操作。这就形成了一个 CompletableFuture 链,每个 CompletableFuture 对象都会按照 Processor 的顺序执行 process 方法。

CompletableFuture链

当调用thenComposeAsync方法时,会创建一个新的CompletableFuture对象:

    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);}
    private <V> CompletableFuture<V> uniComposeStage(Executor e, Function<? super T, ? extends CompletionStage<V>> f) {if (f == null) throw new NullPointerException();Object r; Throwable x;if (e == null && (r = result) != null) {// try to return function result directlyif (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {return new CompletableFuture<V>(encodeThrowable(x, r));}r = null;}try {@SuppressWarnings("unchecked") T t = (T) r;CompletableFuture<V> g = f.apply(t).toCompletableFuture();Object s = g.result;if (s != null)return new CompletableFuture<V>(encodeRelay(s));CompletableFuture<V> d = new CompletableFuture<V>();UniRelay<V> copy = new UniRelay<V>(d, g);g.push(copy);copy.tryFire(SYNC);return d;} catch (Throwable ex) {return new CompletableFuture<V>(encodeThrowable(ex));}}CompletableFuture<V> d = new CompletableFuture<V>();//构建 UniComposeUniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);push(c);c.tryFire(SYNC);return d;}

构造的 UniCompose对象持有前一个CompletableFuture对象的引用,也就是 UniComposesrc 字段。所以即使循环中出现 CompletableFuture 被覆盖,也不用担心被 GC 的问题。

总结

本文以一个实际的场景设计问题为出发点,详细探讨了 CompletableFuture 的链式执行机制,对执行流程和线程切换进行了深入的分析,并对响应式编程的概念和应用进行了简单的讨论。

References

  • https://zh.wikipedia.org/zh-cn/%E5%93%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B

欢迎关注公众号:
在这里插入图片描述

这篇关于小议CompletableFuture 链式执行和响应式编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/597157

相关文章

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

maven 编译构建可以执行的jar包

💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~ 专栏导航 Python系列: Python面试题合集,剑指大厂Git系列: Git操作技巧GO

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

jenkins 插件执行shell命令时,提示“Command not found”处理方法

首先提示找不到“Command not found,可能我们第一反应是查看目标机器是否已支持该命令,不过如果相信能找到这里来的朋友估计遇到的跟我一样,其实目标机器是没有问题的通过一些远程工具执行shell命令是可以执行。奇怪的就是通过jenkinsSSH插件无法执行,经一番折腾各种搜索发现是jenkins没有加载/etc/profile导致。 【解决办法】: 需要在jenkins调用shell脚

生信代码入门:从零开始掌握生物信息学编程技能

少走弯路,高效分析;了解生信云,访问 【生信圆桌x生信专用云服务器】 : www.tebteb.cc 介绍 生物信息学是一个高度跨学科的领域,结合了生物学、计算机科学和统计学。随着高通量测序技术的发展,海量的生物数据需要通过编程来进行处理和分析。因此,掌握生信编程技能,成为每一个生物信息学研究者的必备能力。 生信代码入门,旨在帮助初学者从零开始学习生物信息学中的编程基础。通过学习常用