CompletableFuture、ListenableFuture高级用列

2024-01-13 07:36

本文主要是介绍CompletableFuture、ListenableFuture高级用列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

CompletableFuture
链式

 public static void main(String[] args) throws Exception {CompletableFuture<Integer> thenCompose = T1().thenCompose(Compress::T2).thenCompose(Compress::T3);Integer result = thenCompose.get();System.out.println(result);}// 假设这些是异步操作,并返回CompletableFuture<Integer>public static CompletableFuture<Integer> T1() {return CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return 1;});}public static CompletableFuture<Integer> T2(int valueFromT1) {return CompletableFuture.supplyAsync(() -> {// 使用上一步的结果进行计算int result = valueFromT1 * 2;try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}return result;});}public static CompletableFuture<Integer> T3(int valueFromT2) {return CompletableFuture.supplyAsync(() -> {// 使用上一步的结果进行计算int finalResult = valueFromT2 + 10;return finalResult;});}

异步操作集合对象入库

public void saveUsers(List<User> users) throws InterruptedException, ExecutionException {// 将大集合分成若干个小集合,每个大小为100(具体分片大小根据实际需求调整)List<List<User>> partitions = users.stream().collect(Collectors.groupingBy(it -> users.indexOf(it) / 100)).values().stream().collect(Collectors.toList());List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {partition.forEach(user -> {userRepository.save(user); // 假设这是你的保存方法});});futures.add(future);}// 等待所有任务完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();}

CompletableFuture
异常

public static void saveTut(List<User> users) throws InterruptedException, ExecutionException{// 将大集合分成若干个小集合ThreadLocal<AtomicInteger> threadLocal = ThreadLocal.withInitial(() -> new AtomicInteger(0));List<List<User>> partitions = Lists.partition(users, 10);List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {partition.forEach(user -> {AtomicInteger value = threadLocal.get();value.set(user.getId());value.incrementAndGet();});});// 添加异常处理器future.exceptionally(ex -> {// 记录异常信息//System.out.println("===="+threadLocal.get().intValue());return null;});futures.add(future);}// 等待所有任务完成,并处理整体完成时的异常CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));allFutures.thenAccept(v -> {System.out.println("All save tasks completed.");}).exceptionally(ex -> {// 记录整体完成时的异常//System.err.println(ex.getMessage());return null;});// 确保阻塞直到所有异步操作完成allFutures.get();
} 

ListenableFuture

public class ListProcessingExample  {private static final ListeningExecutorService EXECUTOR_SERVICE = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));public static void main(String[] args) throws InterruptedException {List<User> users = Arrays.asList(new User(1), new User(2), new User(3));//processUsersAsync001(users);}public static void processUsersAsync001(List<User> users) throws InterruptedException {// 将用户列表分为3个分区(根据实际情况调整分区数量)int partitionSize = (int) Math.ceil((double) users.size() / 3);//线程数List<List<User>> partitions = Lists.partition(users, partitionSize);List<ListenableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {ListenableFuture<Void> future = EXECUTOR_SERVICE.submit(() -> {partition.forEach(user -> {System.out.println("Processing user: " + user.getId());});return null;});Futures.addCallback(future, new FutureCallback<Void>() {@Overridepublic void onSuccess(Void result) {System.out.println("Successfully processed a batch of users.");}@Overridepublic void onFailure(Throwable t) {System.err.println("Error processing a batch of users, error: " + t.getMessage());}}, MoreExecutors.directExecutor());futures.add(future);}// 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)EXECUTOR_SERVICE.shutdown();EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}/*** 增加成功回调获取数据*/public static void processUsersAsync002(List<User> users) throws InterruptedException {// 将用户列表分为3个分区(根据实际情况调整分区数量)int partitionSize = (int) Math.ceil((double) users.size() / 3);List<List<User>> partitions = Lists.partition(users, partitionSize);List<ListenableFuture<ProcessedUserResult>> futures = new ArrayList<>();for (List<User> partition : partitions) {ListenableFuture<ProcessedUserResult> future = EXECUTOR_SERVICE.submit(() -> {ProcessedUserResult result = new ProcessedUserResult();partition.forEach(user -> {System.out.println("Processing user: " + user.getId());result.successfulIds.add(user.getId());});return result;});Futures.addCallback(future, new FutureCallback<ProcessedUserResult>() {@Overridepublic void onSuccess(ProcessedUserResult result) {System.out.println("Successfully processed users with IDs: " + result.successfulIds);}@Overridepublic void onFailure(Throwable t) {System.err.println("Error processing a batch of users, error: " + t.getMessage());}}, MoreExecutors.directExecutor());futures.add(future);}// 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)EXECUTOR_SERVICE.shutdown();EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}static class ProcessedUserResult {// 用于存储成功处理的用户ID列表private List<Integer> successfulIds = new ArrayList<>();// 用于存储失败处理的用户ID列表private List<Integer> failedIds = new ArrayList<>();public void addSuccessfulId(int userId) {this.successfulIds.add(userId);}public List<Integer> getSuccessfulIds() {return successfulIds;}public void addFailedId(int userId) {this.failedIds.add(userId);}public List<Integer> getFailedIds() {return failedIds;}}static class User {private int id;public User(int id) {this.id = id;}public int getId() {return id;}}
}

这篇关于CompletableFuture、ListenableFuture高级用列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/600679

相关文章

深入解析Spring TransactionTemplate 高级用法(示例代码)

《深入解析SpringTransactionTemplate高级用法(示例代码)》TransactionTemplate是Spring框架中一个强大的工具,它允许开发者以编程方式控制事务,通过... 目录1. TransactionTemplate 的核心概念2. 核心接口和类3. TransactionT

Java CompletableFuture如何实现超时功能

《JavaCompletableFuture如何实现超时功能》:本文主要介绍实现超时功能的基本思路以及CompletableFuture(之后简称CF)是如何通过代码实现超时功能的,需要的... 目录基本思路CompletableFuture 的实现1. 基本实现流程2. 静态条件分析3. 内存泄露 bug

Python中列表的高级索引技巧分享

《Python中列表的高级索引技巧分享》列表是Python中最常用的数据结构之一,它允许你存储多个元素,并且可以通过索引来访问这些元素,本文将带你深入了解Python列表的高级索引技巧,希望对... 目录1.基本索引2.切片3.负数索引切片4.步长5.多维列表6.列表解析7.切片赋值8.删除元素9.反转列表

正则表达式高级应用与性能优化记录

《正则表达式高级应用与性能优化记录》本文介绍了正则表达式的高级应用和性能优化技巧,包括文本拆分、合并、XML/HTML解析、数据分析、以及性能优化方法,通过这些技巧,可以更高效地利用正则表达式进行复杂... 目录第6章:正则表达式的高级应用6.1 模式匹配与文本处理6.1.1 文本拆分6.1.2 文本合并6

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

Java基础回顾系列-第七天-高级编程之IO

Java基础回顾系列-第七天-高级编程之IO 文件操作字节流与字符流OutputStream字节输出流FileOutputStream InputStream字节输入流FileInputStream Writer字符输出流FileWriter Reader字符输入流字节流与字符流的区别转换流InputStreamReaderOutputStreamWriter 文件复制 字符编码内存操作流(

Java基础回顾系列-第五天-高级编程之API类库

Java基础回顾系列-第五天-高级编程之API类库 Java基础类库StringBufferStringBuilderStringCharSequence接口AutoCloseable接口RuntimeSystemCleaner对象克隆 数字操作类Math数学计算类Random随机数生成类BigInteger/BigDecimal大数字操作类 日期操作类DateSimpleDateForma

CompletableFuture类总结)

CompletableFuture类总结 ```javapublic class CompletableFuture<T> implements Future<T>, CompletionStage<T> {// 1、创建一个异步操作:无返回值public static CompletableFuture<Void> runAsync(Runnable runnable);pu

Mysql高级篇(中)——索引介绍

Mysql高级篇(中)——索引介绍 一、索引本质二、索引优缺点三、索引分类(1)按数据结构分类(2)按功能分类(3) 按存储引擎分类(4) 按存储方式分类(5) 按使用方式分类 四、 索引基本语法(1)创建索引(2)查看索引(3)删除索引(4)ALTER 关键字创建/删除索引 五、适合创建索引的情况思考题 六、不适合创建索引的情况 一、索引本质 索引本质 是 一种数据结构,它用

linux高级学习10

24.9.7学习目录 一.线程1.线程API 一.线程 线程与进程的关系: 线程是轻量级进程,也有PCB,只是各自不同,创建线程使用的底层函数和进程一样,都是clone进程可以蜕变成线程线程是最小的执行单位,进程是最小的分配资源单位 1.线程API (1)查看线程号 #include <pthread.h>pthread_t pthread_self(); (2)