Reactor Mono应用

2024-06-22 08:20
文章标签 应用 reactor mono

本文主要是介绍Reactor Mono应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用案例

创建Mono

使用静态工厂方法创建Mono
import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {// 创建一个包含值的MonoMono<String> monoWithValue = Mono.just("Hello, Reactor!");// 创建一个空的MonoMono<String> emptyMono = Mono.empty();// 创建一个包含错误的MonoMono<String> errorMono = Mono.error(new RuntimeException("Something went wrong"));}
}
从Callable、Supplier、CompletableFuture等创建Mono
import reactor.core.publisher.Mono;import java.util.concurrent.CompletableFuture;public class MonoExample {public static void main(String[] args) {// 从Callable创建MonoMono<String> callableMono = Mono.fromCallable(() -> "Hello from Callable");// 从Supplier创建MonoMono<String> supplierMono = Mono.fromSupplier(() -> "Hello from Supplier");// 从CompletableFuture创建MonoCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello from CompletableFuture");Mono<String> futureMono = Mono.fromFuture(future);}
}

订阅Mono

订阅是开始数据流处理的关键步骤

import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {Mono<String> mono = Mono.just("Hello, Reactor!");// 订阅并消费数据mono.subscribe(value -> System.out.println("Received: " + value), // onNexterror -> System.err.println("Error: " + error),   // onError() -> System.out.println("Completed")             // onComplete);}
}

操作Mono

转换和操作数据
import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {Mono<String> mono = Mono.just("hello");// 转换数据Mono<String> transformedMono = mono.map(String::toUpperCase);// 链式操作transformedMono.flatMap(value -> Mono.just(value + " World")).subscribe(System.out::println);  // 输出: HELLO World}
}

异常处理

import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {Mono<String> monoWithError = Mono.error(new RuntimeException("Original Error"));// 捕获并处理错误monoWithError.onErrorReturn("Fallback value").subscribe(System.out::println);  // 输出: Fallback value// 使用 onErrorResume 提供备用的 MonomonoWithError.onErrorResume(error -> {System.err.println("Error: " + error);return Mono.just("Recovered value");}).subscribe(System.out::println);  // 输出: Recovered value}
}

组合Mono

import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {Mono<String> mono1 = Mono.just("Hello");Mono<String> mono2 = Mono.just("World");// 组合两个MonoMono<String> combinedMono = Mono.zip(mono1, mono2, (s1, s2) -> s1 + " " + s2);combinedMono.subscribe(System.out::println);  // 输出: Hello World}
}

调试和日志

使用日志功能可以帮助调试和监控数据流。

import reactor.core.publisher.Mono;
import reactor.util.Logger;
import reactor.util.Loggers;public class MonoExample {public static void main(String[] args) {Logger logger = Loggers.getLogger(MonoExample.class);Mono<String> mono = Mono.just("Hello, Reactor!").log();  // 默认日志mono.subscribe(System.out::println);}
}

调度器(Schedulers)

使用调度器来控制Mono的执行线程。

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;public class MonoExample {public static void main(String[] args) {Mono<String> mono = Mono.fromCallable(() -> {// 在独立的线程池中执行Thread.sleep(1000);return "Hello from another thread";});mono.subscribeOn(Schedulers.boundedElastic())  // 指定订阅时的调度器.publishOn(Schedulers.parallel())         // 指定发布时的调度器.subscribe(System.out::println);}
}

应用场景

异步计算

Mono可以用来表示和处理异步计算的结果。例如,当你需要从一个异步操作中获取一个值时,可以使用Mono。

Mono<String> asyncResult = Mono.fromCallable(() -> {// 模拟异步计算Thread.sleep(1000);return "Result";
});asyncResult.subscribe(result -> System.out.println("Received: " + result));

调用远程服务

在微服务架构中,调用远程服务(如REST API或gRPC)时,通常会返回一个单一的结果。这是Mono的一个典型应用场景。

WebClient webClient = WebClient.create("http://example.com");Mono<String> response = webClient.get().uri("/resource").retrieve().bodyToMono(String.class);response.subscribe(body -> System.out.println("Response: " + body));

数据库查询

Mono非常适合表示数据库查询返回的单个结果。例如,查询一个用户的信息

Mono<User> userMono = reactiveUserRepository.findById(userId);userMono.subscribe(user -> System.out.println("User: " + user));

事件驱动的处理

在事件驱动架构中,某些事件处理结果可能是单一的值。例如,处理某个事件并返回一个处理结果

Mono<EventResult> eventResultMono = processEvent(event);eventResultMono.subscribe(result -> System.out.println("Event processed: " + result));

错误处理

使用Mono可以优雅地处理异步操作中的错误。例如,如果某个操作可能会失败,可以返回一个错误的Mono并在订阅时处理错误

Mono<String> result = performOperation().onErrorReturn("Fallback value");result.subscribe(value -> System.out.println("Received: " + value),error -> System.err.println("Error: " + error)
);

延迟操作

Mono可以用于表示一个延迟操作,执行某些延迟逻辑

Mono<Long> delayMono = Mono.delay(Duration.ofSeconds(3));delayMono.subscribe(time -> System.out.println("Delayed for 3 seconds"));

条件逻辑

使用Mono可以在异步流中进行条件判断和逻辑处理。例如,根据某个条件返回不同的结果

Mono<String> conditionalMono = Mono.just("data").flatMap(data -> {if (data.equals("condition")) {return Mono.just("Condition met");} else {return Mono.just("Condition not met");}});conditionalMono.subscribe(System.out::println);

转换与映射

Mono可以用于对单个值进行转换或映射。例如,将一个值转换为另一个类型的值

Mono<String> originalMono = Mono.just("original");Mono<Integer> transformedMono = originalMono.map(String::length);transformedMono.subscribe(length -> System.out.println("Length: " + length));

资源管理

Mono可以用于在异步操作中管理资源,如文件或连接的打开和关闭

Mono.using(() -> new BufferedReader(new FileReader("data.txt")), reader -> Mono.fromCallable(() -> reader.readLine()), BufferedReader::close
).subscribe(line -> System.out.println("Read line: " + line));

组合多个异步操作

Mono可以用于组合多个异步操作,构建复杂的异步数据流

Mono<String> combinedMono = Mono.zip(Mono.just("Hello"),Mono.just("World"),(s1, s2) -> s1 + " " + s2
);combinedMono.subscribe(System.out::println);  // 输出: Hello World

这篇关于Reactor Mono应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1083753

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

zoj3820(树的直径的应用)

题意:在一颗树上找两个点,使得所有点到选择与其更近的一个点的距离的最大值最小。 思路:如果是选择一个点的话,那么点就是直径的中点。现在考虑两个点的情况,先求树的直径,再把直径最中间的边去掉,再求剩下的两个子树中直径的中点。 代码如下: #include <stdio.h>#include <string.h>#include <algorithm>#include <map>#

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

AI行业应用(不定期更新)

ChatPDF 可以让你上传一个 PDF 文件,然后针对这个 PDF 进行小结和提问。你可以把各种各样你要研究的分析报告交给它,快速获取到想要知道的信息。https://www.chatpdf.com/

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。

【C++高阶】C++类型转换全攻略:深入理解并高效应用

📝个人主页🌹:Eternity._ ⏩收录专栏⏪:C++ “ 登神长阶 ” 🤡往期回顾🤡:C++ 智能指针 🌹🌹期待您的关注 🌹🌹 ❀C++的类型转换 📒1. C语言中的类型转换📚2. C++强制类型转换⛰️static_cast🌞reinterpret_cast⭐const_cast🍁dynamic_cast 📜3. C++强制类型转换的原因📝

基于 YOLOv5 的积水检测系统:打造高效智能的智慧城市应用

在城市发展中,积水问题日益严重,特别是在大雨过后,积水往往会影响交通甚至威胁人们的安全。通过现代计算机视觉技术,我们能够智能化地检测和识别积水区域,减少潜在危险。本文将介绍如何使用 YOLOv5 和 PyQt5 搭建一个积水检测系统,结合深度学习和直观的图形界面,为用户提供高效的解决方案。 源码地址: PyQt5+YoloV5 实现积水检测系统 预览: 项目背景