本文主要是介绍使用 CompletableFuture 分批处理任务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、无返回值任务函数
// 数据分批
List<List<StatisticsDTO>> batches = Lists.partition(statisticsList, BATCH_SIZE);
List<CompletableFuture<Void>> futures = new ArrayList<>(batches.size());// 数据处理
for (int i = 0; i < batches.size(); i++) {logger.info("批次 " + i + " 开始处理...");String logId = LogIdThreadLocal.getLogId(); // 传递主线程的 logIdList<StatisticsDTO> batchData = batches.get(i);CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {LogIdThreadLocal.setLogId(logId);processBatch(batchData);} finally {LogIdThreadLocal.clean();}});futures.add(future);
}// 等待所有的异步任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.join();
二、带返回值任务函数
// 数据分批
List<List<StatisticsDTO>> batches = Lists.partition(statisticsList, BATCH_SIZE);
List<CompletableFuture<List<StatisticsDTO>>> futures = new ArrayList<>(batches.size());// 数据处理
for (int i = 0; i < batches.size(); i++) {logger.info("批次 " + i + " 开始处理...");String logId = LogIdThreadLocal.getLogId(); // 传递主线程的 logIdList<StatisticsDTO> batchData = batches.get(i);CompletableFuture<List<DoctorAvatarAnalysisDTO>> future = CompletableFuture.supplyAsync(() -> {try {LogIdThreadLocal.setLogId(logId);return processBatch(batchData);} finally {LogIdThreadLocal.clean();}});futures.add(future);
}// 等待所有 CF 完成并合并结果
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
List<StatisticsDTO> result = allOf.thenApply(v -> futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList())
).join();
这篇关于使用 CompletableFuture 分批处理任务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!