本文主要是介绍CompletableFuture、ListenableFuture高级用列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
CompletableFuture
链式
public static void main(String[] args) throws Exception {CompletableFuture<Integer> thenCompose = T1().thenCompose(Compress::T2).thenCompose(Compress::T3);Integer result = thenCompose.get();System.out.println(result);}// 假设这些是异步操作,并返回CompletableFuture<Integer>public static CompletableFuture<Integer> T1() {return CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return 1;});}public static CompletableFuture<Integer> T2(int valueFromT1) {return CompletableFuture.supplyAsync(() -> {// 使用上一步的结果进行计算int result = valueFromT1 * 2;try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}return result;});}public static CompletableFuture<Integer> T3(int valueFromT2) {return CompletableFuture.supplyAsync(() -> {// 使用上一步的结果进行计算int finalResult = valueFromT2 + 10;return finalResult;});}
异步操作集合对象入库
public void saveUsers(List<User> users) throws InterruptedException, ExecutionException {// 将大集合分成若干个小集合,每个大小为100(具体分片大小根据实际需求调整)List<List<User>> partitions = users.stream().collect(Collectors.groupingBy(it -> users.indexOf(it) / 100)).values().stream().collect(Collectors.toList());List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {partition.forEach(user -> {userRepository.save(user); // 假设这是你的保存方法});});futures.add(future);}// 等待所有任务完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();}
CompletableFuture
异常
public static void saveTut(List<User> users) throws InterruptedException, ExecutionException{// 将大集合分成若干个小集合ThreadLocal<AtomicInteger> threadLocal = ThreadLocal.withInitial(() -> new AtomicInteger(0));List<List<User>> partitions = Lists.partition(users, 10);List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {partition.forEach(user -> {AtomicInteger value = threadLocal.get();value.set(user.getId());value.incrementAndGet();});});// 添加异常处理器future.exceptionally(ex -> {// 记录异常信息//System.out.println("===="+threadLocal.get().intValue());return null;});futures.add(future);}// 等待所有任务完成,并处理整体完成时的异常CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));allFutures.thenAccept(v -> {System.out.println("All save tasks completed.");}).exceptionally(ex -> {// 记录整体完成时的异常//System.err.println(ex.getMessage());return null;});// 确保阻塞直到所有异步操作完成allFutures.get();
}
ListenableFuture
public class ListProcessingExample {private static final ListeningExecutorService EXECUTOR_SERVICE = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));public static void main(String[] args) throws InterruptedException {List<User> users = Arrays.asList(new User(1), new User(2), new User(3));//processUsersAsync001(users);}public static void processUsersAsync001(List<User> users) throws InterruptedException {// 将用户列表分为3个分区(根据实际情况调整分区数量)int partitionSize = (int) Math.ceil((double) users.size() / 3);//线程数List<List<User>> partitions = Lists.partition(users, partitionSize);List<ListenableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {ListenableFuture<Void> future = EXECUTOR_SERVICE.submit(() -> {partition.forEach(user -> {System.out.println("Processing user: " + user.getId());});return null;});Futures.addCallback(future, new FutureCallback<Void>() {@Overridepublic void onSuccess(Void result) {System.out.println("Successfully processed a batch of users.");}@Overridepublic void onFailure(Throwable t) {System.err.println("Error processing a batch of users, error: " + t.getMessage());}}, MoreExecutors.directExecutor());futures.add(future);}// 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)EXECUTOR_SERVICE.shutdown();EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}/*** 增加成功回调获取数据*/public static void processUsersAsync002(List<User> users) throws InterruptedException {// 将用户列表分为3个分区(根据实际情况调整分区数量)int partitionSize = (int) Math.ceil((double) users.size() / 3);List<List<User>> partitions = Lists.partition(users, partitionSize);List<ListenableFuture<ProcessedUserResult>> futures = new ArrayList<>();for (List<User> partition : partitions) {ListenableFuture<ProcessedUserResult> future = EXECUTOR_SERVICE.submit(() -> {ProcessedUserResult result = new ProcessedUserResult();partition.forEach(user -> {System.out.println("Processing user: " + user.getId());result.successfulIds.add(user.getId());});return result;});Futures.addCallback(future, new FutureCallback<ProcessedUserResult>() {@Overridepublic void onSuccess(ProcessedUserResult result) {System.out.println("Successfully processed users with IDs: " + result.successfulIds);}@Overridepublic void onFailure(Throwable t) {System.err.println("Error processing a batch of users, error: " + t.getMessage());}}, MoreExecutors.directExecutor());futures.add(future);}// 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)EXECUTOR_SERVICE.shutdown();EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}static class ProcessedUserResult {// 用于存储成功处理的用户ID列表private List<Integer> successfulIds = new ArrayList<>();// 用于存储失败处理的用户ID列表private List<Integer> failedIds = new ArrayList<>();public void addSuccessfulId(int userId) {this.successfulIds.add(userId);}public List<Integer> getSuccessfulIds() {return successfulIds;}public void addFailedId(int userId) {this.failedIds.add(userId);}public List<Integer> getFailedIds() {return failedIds;}}static class User {private int id;public User(int id) {this.id = id;}public int getId() {return id;}}
}
这篇关于CompletableFuture、ListenableFuture高级用列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!