本文主要是介绍CompletableFuture:组合式异步编程-Java 8,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
概述
我们看到的下一代网络应用都采用“混聚”(mash-up)的方式:它会使用来自多个来源的内容,将这些内容聚合在一起,方便用户生活。
比如,你可能希望为你的法国客户提供指定主题的热点报道。为实现这一功能,你需要向谷歌或者Twitter的API请求所有语言中针对该主体最热门的评论,可能还需要依据你的内部算法对它们的相关性进行排序。之后,你可能还需要使用谷歌的翻译服务把它们翻译成法语,甚至利用谷歌地图服务定位出评论者的位置信息,最终将所有这些信息聚集起来,呈现在你的网站上。
当然,如果某些外部网络服务发生响应慢的情况,你希望依旧能为用户提供部分信息,比如提供带问号标记的通用地图,以文本的方式显示信息,而不是呆呆地显示一片空白屏幕,直到地图服务器返回结果或者超时退出。下图就是典型的“混聚”应用如何与所需的远程服务交互。
要实现类似的服务,你需要与互联网上的多个web服务通信。可是,你并不希望因为等待某些服务的响应,阻塞应用程序的运行,浪费十亿宝贵的CPU时钟周期。比如,不要因为等待Facebook的数据,暂停对来自Twitter的数据处理。
这些场景体现了多任务程序设计的另一面。之前介绍的分支/合并框架以及并行流是实现并行处理的宝贵工具;他们将一个操作切分成多个子操作,在多个不同的核、CPU甚至是机器上并行地执行这些子操作。
与此相反,如果你的意图是实现并发,而非并行,或者你的主要目标是在同一个CPU上朱熹几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想做的是避免因为等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,因为这种等待的时间很可能相当长。通过本章你会了解,Future接口,尤其是它的新版CompletableFuture,是处理这种情况的利器。下面说明了并行和并发的区别。
Future接口
Future接口在Java 5中被引入,设计的初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发哪些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。打个比方,你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的店员会给你一张发票,告诉你什么时候你的衣服会洗好(这就是一个Future事件)。衣服干洗的同时,你可以去做其他的事情。Future的另一优点是它比更底层的Thread更易用。要使用Future,通常你只需要将耗时的操作封装到Callable对象中,再将它提交给ExecutorService,就可以了。下面的代码是Java 8之前使用Future的一个例子。
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {public Double call() {return doSomeLongComputation();}
});
douSomethingElse();
try {Double result = future.get(1, TimeUnit.SECONDS);
} catch(ExecutionException ee) {// 计算抛出一个异常
} catch(InterruptedException ie) {// 当前线程在等待过程中被中断
} catch(TimeoutException te) {// 在Future对象完成之前超过已过期
}
正如上面的过程那样,这种编程方式让你的线程可以在ExecutorService以并发方法调用另一个线程执行耗时操作的同时,去执行一些其他任务。接着,如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。
你能想象这种场景存在怎样的问题吗?如果该长时间运行的操作永远不返回会怎样?为了处理这种可能性,虽然Future提供一个无参的get方法,我们还是推荐大家使用重载版本的get方法,它接受一个超时的参数,你可以定义你的线程等待Future结果的最长时间,而不是永无止境的等下去。
Future接口的局限性
我们知道Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操作结束,以及获取计算的结果。但是这些特性还不足以让你编写简洁的并发代码。比如,我们很难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一查询操作结果合并”。但是,使用Future中提供的方法完成这样的操作又是另一回事。这也是我们需要更具描述能力的特性的原因,比如下面这些场景,我们需要把需求以命令式的方式描述出来。
- 将两个异步计算合并为一个一一这两个异步计算之间相互独立,同时第二个又依赖第一个的结果。
- 等待Future集合中的所有任务都完成。
- 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
- 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
- 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单的阻塞等待操作的结果)。
这一章,你会了解新的CompletableFuture类(它实现了Future接口)如何利用Java 8的新特性以更直观的方式将上述需求都变为可能。Stream和CompletableFuture的设计都遵循了类似的模式:它们都使用了Lambda表达式以及流水线的思想。从这个角度,你可以说CompletableFuture和Future的关系就跟Stream与Collection的关系一样。
使用CompletableFuture构建异步应用
为了展示CompletableFuture的强大特性,我们会创建一个名为“最佳价格查询器”(best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程中,你会学到几个重要的技能。
- 首先,你会学到如何为你的客户提供异步API(如果你拥有一间在线商店的话,这是非常有帮助的)。
- 其次,你会掌握如何让你使用了同步API的代码变为非阻塞代码。你会了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。这种情况肯定会出现,比如,在线商店返回了你想要购买商品的原始价格,并附带着一个折扣代码一一最终,要计算出该商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。
- 你还会学到如何以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所以的商品店都返回它们各自的价格(这种方式存在一定的风险,一旦某家商店的服务中断,用户可能遭遇白屏)。
同步API与异步API
同步API其实只是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。
与此相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方式异步的一一这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是通过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。这种方式的计算在I/O系统程序设计中非常常见:你发起了一次磁盘访问,这次访问和你的其他计算操作是异步的,你完成其他的任务时,磁盘块的数据可能还没有载入内存,你只需要等待数据的载入完成。
实现异步API
为了实现最佳价格查询器应用,让我们从每个商店都应该提供的API定义入手。首先商店应该声明已经指定产品名称返回价格的方法:
public class Shop {public double getPrice(String product) {return calculatePrice(product);}private double calculatePrice(String product) {delay();return random.nextDouble() * product.charAt(0) + product.charAt(1);}public static void delay() {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成等待1s钟,这是无法接受的,尤其是考虑到最佳价格查询器对网络中所有商品都要重复这种操作。本章接下来的小节中,你会了解如何以异步方式使用同步API解决这个问题。
将同步方法转换为异步方法
Java 5引入了java.util.concurrent.Future接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果。这意味着Future是一个暂时还不可知的处理器,这个值在计算完成后,可以通过调用它的get方法取得。因为这样的设计,getPriceAsync方法才能立刻返回,给调用线程一个机会,能在同一时间去执行其他的有价值的计算任务。新的CompletableFuture类提供了大量的方法,让我们有机会以多种可能的方式轻松实现这个方法,比如下面这段代码。
public Future<Double> getPriceAsync(String product) {// 创建CompletableFuture对象,它会包含计算的结果CompletableFuture<Double> futurePrice = new CompletableFuture<>();new Thread(() -> {final double price = calculatePrice(product);// 需要长时间计算的任务结束并得出结果时,设置Future的返回值futurePrice.complete(price);}).start();// 无需等待还没结束的计算,直接返回Future对象return futurePrice;
}// 客户端代码调用
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime + " msecs");
// 执行更多任务,比如查询其他商店
doSomethingElse();
// 获取商品的价格
try {double price = futurePrice.get();System.out.printf("Price is %.2f%n", price);
} catch (Exception e) {throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
我们看到这段代码中,客户向商店查询了某种商品的价格。由于商?提供了异步API,该次调用立即返回了一个 Future 对象,通过该对象客户可以在将来的某个时刻取得商品的价格。这种方式下,客户在进行商品价格查询的同时,还能执行一些其他的任务,比如查询其他家商品中商品的价格,不会呆呆的阻塞在那里等待第一家商品返回请求的结果。最后,如果所有有意义的工作都已经完成,客户所有要执行的工作都依赖于商?价格时,再调用 Future 的 get 方法。执行了这个操作后,客户要么获得 Future 中封装装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问。
错误处理
但是,如果价格计算过程中产生了错误会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待 get 方法返回结果的客户端永久地被阻塞。
客户端可以使用重载版本的 get 方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,你应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了 Timeout-Exception。不过,也因为如此,你不会有机会发现计算商?价格的线程内到底发生了什么问题
才引发了这样的失效。为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。
public Future<Double> getPriceAsync(String product) {CompletableFuture<Double> futurePrice = new CompletableFuture<>();new Thread( () -> {try {double price = calculatePrice(product);futurePrice.complete(price);} catch (Exception ex) {// 抛出导致失败的异常,完成这次Future操作futurePrice.completeExceptionally(ex);}}).start();return futurePrice;
}
客户端现在会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的Exception 参数,即价格计算方法最初抛出的异常。所以,举例来说,如果该方法抛出了一个运行时异常“product not available”,客户端就会得到像下面这样一段 ExecutionException :
java.util.concurrent.ExecutionException: java.lang.RuntimeException: productnot availableat java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)... 5 moreCaused by: java.lang.RuntimeException: product not availableat lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23)at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source)at java.lang.Thread.run(Thread.java:744)
使用工厂方法supplyAsync创建CompletableFuture
CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。比如,采用supplyAsync方法后,你可以用以行语句重写上面的代码:
public Future<Double> getPriceAsync(String product) {return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象在完成异步执行后读取调用生产者方法的返回值。生产者会校验ForkJoinPool池中的某个执行线程运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行器执行生产者方法。
让你的代码免受阻塞之苦
你已经被要求进行“最佳价格查询器”应用的开发了,不过你需要查询的所有商店都只提供了同步API。换句话说,你有一个商家的列表,如下所示:
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),new Shop("LetsSaveBig"),new Shop("MyFavoriteShop"),new Shop("BuyItAll"));
传统的同步实现方式如下:
public List<String> findPrices(String product) {return shops.stream().map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))).collect(toList());
}
好吧,这段代码看起来非常直白。现在试着用该方法去查询myPhone27S产品。
long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");
结果如下:
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
is 214.13, BuyItAll price is 184.74]
Done in 4032 msecs
正如你预期的, findPrices 方法的执行时间仅比4秒钟多了那么几毫秒,因为对这4个商店的查询是顺序进行的,并且一个查询操作会阻塞另一个,每一个操作都要花费大约1秒钟的时间计算请求商商品的价格。你怎样才能改进这个结果呢?
使用并行流对请求进行并行操作
首先,我们应该想到的版本应该是把顺序流改成并行流,使用并行流来避免顺序计算。
public List<String> findPrices(String product) {return shops.parallelStream().map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))).collect(Collectors.toList());
}
``运行代码,发现新版findPrices的改进了吧。
```bash
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]
Done in 1180 msecs
相当不错啊!看起来这是个简单有效的注意:现在对四个不同商店的查询实现了并行,所以完成所以操作的总耗时只有1秒多一点儿。你能做到更好me?我们尝试使用刚学过的CompletableFuture,将findPrices方法中对不同商店的同步调用替换为异步调用。
使用CompletableFuture发起异步调用
我们可以使用工厂方法supplyAsyn创建CompletableFuture对象。如下所示:
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product))))
.collect(toList());
使用这种方式,你会得到一个List<CompletableFuture>,列表中的每个CompletableFuture对象在计算完成后都包含商店的String类型的名称。但是,由于你用的CompletableFuture实现的findPrices方法要求返回一个List,你需要等待所有的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回。
为了实现这个效果,你可以向最初的List<CompletableFuture>施加第二个map操作,对List中的所有future对象执行join操作,一个接一个地等待它们运行结束。注意CompletableFuture类中的join方法和Future接口中的get有相同的含义,它们唯一的不同时join不会抛出任何检测到的异常,会抛出RunTimeException。使用它你不再需要使用try/catch语句让你传递给map方法的Lambda表达式变得过于臃肿。所有这些整合在一起,你就可以重新实现findPrices了,具体代码如下:
public List<String> findPrices(String product) {
// 使用CompletableFuture以异步方式计算每种商品的价格
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " +
shop.getPrice(product)))
.collect(Collectors.toList());
// 等待所有异步操作结束
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
// 使用Future的get接口,lambda表达式比较臃肿
priceFutures.stream().map(future -> {try {return future.get();} catch (Exception e) {System.out.println(e);}return null;}).collect(Collectors.toList());
注意到了吗?这里使用了两个不同的 Stream 流水线,而不是在同一个处理流的流水线上一个接一个地放置两个 map 操作——这其实是有缘由的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建 CompletableFuture 对象只能在前一个操作结束之后执行查询指定商家的动作、通知 join方法返回计算结果。
图的上半部分展示了使用单一流水线处理流的过程,我们看到,执行的流程(以虚线标识)是顺序的。事实上,新的CompletableFuture对象只有在前一个操作完成结束之后,才能创建。此处相反,图的下半部分展示了如何先将CompletableFuture对象聚集到一个列表中(即图中椭圆的部分),让对象们可以在等待其他对象完成操作之前就能启动。
运行代码清单11-11中的代码来了解下第三个版本 findPrices 方法的性能,你会得到下面这几行输出:
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]
Done in 2005 msecs
这个结果让人相当失望,不是吗?超过2?意味着利用 CompletableFuture 实现的版本,比刚开始代码清单11-8中原生顺序执行且会发生阻塞的版本快。但是它的用时也差不多是使用并行流的前一个版本的两倍。尤其是,考虑到从顺序执行的版本转换到并行流的版本只做了非常小的改动,就让人更加沮丧。
与此形成鲜明对比的是,我们为采用 CompletableFutures 完成的新版方法做了大量的工作!但,这就是全部的真相吗?这种场景下使用 CompletableFutures 真的是浪费时间吗?或者我们可能漏掉了某些重要的东西?继续往下探究之前,让我们修息几分钟,尤其是想想你测试代码的机器是否足以以并行方式运行四个线程。
寻找更好的方案
并行流的版本工作得非常好,那是因为它能并行地执行四个任务,所以它几乎能为每个商家分配一个线程。但是,如果你想要增加第五个商家到商店列表中,让你的“最佳价格查询”应用对其进行处理,这时会发生什么情况?不不意外,顺序执行版本的执行还是需要大约5秒多钟的时间,下面是执行的输出:
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74, ShopEasy price is 176.08]
Done in 5025 msecs
非常不幸,并行流版本的程序这次比之前也多消耗了差不多1秒钟的时间,因为可以并行运行(通用线程池中处于可用状态的)的四个线程现在都处于繁忙状态,都在对前4个商店进行查询。第五个查询只能等到前面某一个操作完成释放出空闲线程才能继续,它的运行结果如下:
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74, ShopEasy price is 176.08]
Done in 2177 msecs
CompletableFuture 版本的程序结果如何呢?我们也试着添加第5个商店对其进行了测试,结果如下:
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74, ShopEasy price is 176.08]
Done in 2006 msecs
CompletableFuture 版本的程序似乎比并行流版本的程序还快那么一点儿。但是最后这个版本也不太令人满意。比如,如果你试图让你的代码处理9个商店,并行流版本耗时3143毫秒,而CompletableFuture 版本耗时3009毫秒。它们看起来不相伯仲,究其原因都一样:它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。
使用定制的执行器
就这个主题而言,明智的选择似乎是创建一个配有线程的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷,但是你该如何选择合适的线程数目呢?
调整线程池的大小
《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brain Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads = Ncpu * Ucpu * (1 + W/C)
其中:
Ncpu是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到。
Ucpu是期望的CPU利用率(该值应介于0和1之间)。
W/C是等待时间与计算时间的比率。
你的应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。这意味着如果你期望的CPU利用率是100%,你需要创建一个拥有400个线程的线程池。实际操作中,如果你创建的线程数比商?的数目更多,反而是一种浪费,因为这样做之后,你线程池中的有些线程根本没有机会被使用。出于这种考虑,我们建议你将执行器使用的线程数,与你需要查询的商店数目设定为同一个值,这样每个商店都应该对应一个服务线程。不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,你还是需要设置一个上限,比如100个线程。代码清单如下所示。
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size, 100), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {final Thread t = new Thread(r);// 使用守护线程一一这种方式不会组织程序的关停t.setDaemon(true);return t;}
});
注意,你现在正在创建的是一个由守护线程构成的线程池。Java程序无法终止或者退出一个正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护线程,意味着程序退出时它也会被回收。这二者之间没有性能上的差异。现在,你可以将执行器作为第二个参数传递给supplyAsync工厂方法了。比如,你现在可以安装下面的方式创建一个可查询指定商品价格的CompletableFuture对象:
CompletableFuture.supplyAsync(() -> shop.getname + " price is " + shop.getPrice(product), executor);
改进之后,使用 CompletableFuture 方案的程序处理5个商店仅耗时1021秒,处理9个商店时耗时1022秒。一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的阈值100。这个例子证明了要创建更适合你的应用特性的执行器,利用 CompletableFutures 向其提交任务执行是个不错的主意。处理需大量使用异步操作的情况时,这几乎是最有效的策略。
并行一一使用流还是CompletableFuture
目前为止,你已经指定对集合进行并行计算有两种方式:要么将其转换为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发送阻塞。
我们对使用这些API的建议如下。
- 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用ParallelStream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没必要创建比处理器核更多的线程)。
反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络、磁盘等),那么使用CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算来设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断导致什么触发了等待。
对多个异步任务进行流水线操作
让我们假设所以的商店都同意使用一个集中式的折扣服务。该折扣服务提供了五个不同的折扣代码,每个折扣代码对应不同的折扣率。你使用一个枚举型变量Discount.Code来实现这一想法,具体代码如下:
public class Discount {public enum Code {NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);private final int percentage;Code(int percentage) {this.percentage = percentage;}}
}
我们还假设所有的商店都同意修改getPrice方法的返回格式。getPrice现在以ShopName:price:DiscountCode的格式返回一个String类型的值。
public String getPrice(String product) {double price = calculatePrice(product);Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];return String.format("%s:%.2f:%s", name, price, code);
}
private double calculatePrice(String product) {delay();return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
调用 getPrice 方法可能会返回像下面这样一个 String 值:
BestPrice:123.26:GOLD
实现折扣服务
你的“最佳价格查询器”应用现在能从不同的商店取得商品价格,解析结果字符串,针对每个字符串,查询折扣服务取的折扣代码。这个流程决定了请求商品的最终折扣价格(每个折扣代码的实际折扣比率有可能发生变化,所以你每次都需要查询折扣服务)。我们已经将对商店返回字符串的解析操作封装到了下面的 Quote 类之中:
public class Quote {private final String shopName;private final double price;private final Discount.Code discountCode;public Quote(String shopName, double price, Discount.Code code) {this.shopName = shopName;this.price = price;this.discountCode = code;}public static Quote parse(String s) {String[] split = s.split(":");String shopName = split[0];double price = Double.parseDouble(split[1]);Discount.Code discountCode = Discount.Code.valueOf(split[2]);return new Quote(shopName, price, discountCode);}public String getShopName() { return shopName; }public double getPrice() { return price; }public Discount.Code getDiscountCode() { return discountCode; }
}
通过传递 shop 对象返回的字符串给静态工厂方法 parse ,你可以得到 Quote 类的一个实例,它包含了shop 的名称、折扣之前的价格,以及折扣代码。
Discount 服务还提供了一个 applyDiscount 方法,它接收一个 Quote 对象,返回一个字符串,表示生成该 Quote 的 shop 中的折扣价格,代码如下所示。
public class Discount {public enum Code {// 源码暂时省略}public static String applyDiscount(Quote quote) {return quote.getShopName() + " price is " +Discount.apply(quote.getPrice(),quote.getDiscountCode());}private static double apply(double price, Code code) {delay();return format(price * (100 - code.percentage) / 100);}
}
使用Discount服务
让我们再次使用 CompletableFuture 提供的特性,以异步方式重新实现 findPrices 方法。详细代码如下所示。如果你发现有些内容不太熟悉,不用太担心,我们很快会进行针对性的介绍。
public List<String> findPrices(String product) {List<CompletableFuture<String>> priceFutures =shops.stream()// 以异步方式取得每个shop中指定产品的原始价格.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))// 返回值转换为Quoto类型.map(future -> future.thenApply(Quote::parse))// 使用另一异步任务构造期望的Future,申请折扣.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))).collect(toList());// 等待流中的所有Future执行完毕,并提取各种的返回值return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}
这一次,事情看起来变得更加复杂了,所以让我们一步一步地理解到底发生了什么。这三次转换的流程如图11-5所示。
java 8的CompletableFuture API提供了名为thenCompose的方法,它允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。换句话说,你可以创建两个CompletableFuture对象,对第一个CompletableFuture对象调用thenCompose,并向其传递一个参数。当第一个ComplatableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回作为输入计算第二个CompletableFuture对象。使用这种方式,即使Future在向不同的商店收集报价,主线程还是能继续执行其他重要的操作,比如响应UI事件。
将这三次map操作的返回的Stream元素收集到一个列表,你就得到一个List<CompletableFuture>,等这些CompletableFuture对象最终执行完毕,你就可以使用join取得它们的返回值。
上面代码的输出结果如下:
[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]
Done in 2035 msecs
thenCompose方法向CompletableFuture类中其他方法一样,也提供了一个以Async后缀结尾的版本thenComposeAsync。通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池中,所以每个任务是由不同的线程处理的。就这个例子而言,第二个CompletableFuture对象的结果取决于第一个CompletableFuture,所以你使用哪个版本的方法来处理CompletableFuture对象,对于最终的结果,或者大致的时间而言都没有多少差别。我们选择thenCompose方法的原因是因为它更高效一些,因为少了很多线程切换的开销。
将两个CompletableFuture对象整合起来,无论它们是否存在依赖
你对一个CompletableFuture对象调用了thenCompose方法,并向其传递了第二个CompletableFuture,而第二个CompletableFuture又需要使用第一个CompletableFuture的执行结果作为输入。但是,另一个比较常见的情况是,你需要将两个完全不相干的CompletableFuture对象的结果整合起来,而且你也不希望等待第一个任务完全接收才开始第二项任务。
这种情况,你应该使用thenCombine方法,它接收名为BiFunction的第二个参数,这个参数定义了两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样,thenCombine方法也提供有一个Async的版本。这里,如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。
如下示例:
Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),(price, rate) -> price * rate);
对Future和CompletableFuture的回顾
CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。
响应CompletableFuture的completion事件
目前为止,你实现的 findPrices 方法只有在取得所有商?的返回值时才显示商品的价格。而你希望的效果是,只要有商店返回商品价格就在第一时间显示返回值,不再等待那些还未返回的商店(有些甚至会发生超时)。你如何实现这种更进一步的改进要求呢?
public Stream<CompletableFuture<String>> findPricesStream(String product) {return shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)).map(future -> future.thenApply(Quote::parse)).map(future -> future.thenCompose(quote ->CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));
注意,和你之前看到的 thenCompose 和 thenCombine 方法一样, thenAccept 方法也提供了一个异步版本,名为 thenAcceptAsync 。异步版本的方法会对处理结果的消费者进行调度,从线程池中选择一个新的线程继续执行,不再由同一个线程完成 CompletableFuture 的所有任务。因为你想要避免不必要的上下文切换,更重要的是你希望避免在等待线程上浪费时间,尽快响应 CompletableFuture 的 completion 事件,所以这里没有采用异步版本。
由于thenAccept方法已经定义了如何处理CompletableFuture返回的结果,一旦CompletableFuture计算得到结果,它就会返回一个CompletableFuture。所以map操作返回的使用Stream<CompletableFuture>。对这个 <CompletableFuture> 对象,你能做的事非常有限,只能等待其运行结束,不过这也是你所期望的。你还希望能给最慢的商店一些机会,让它有机会打印输出返回的价格。为了实现这一目的,你可以把构成Stream 的所有CompletableFuture 对象放到一个数组中,等待所有的任务执行完成,代码如下所示。
CompletableFuture[] futures = findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println)).toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
allOf工厂接收一个由CompletableFuture构成的数组,数组中的所有CompletableFuture对象执行完成之后,它返回一个CompletableFuture对象。这意味着,如果你需要等待最初 Stream 中的所有 CompletableFuture 对象执行完毕,对 allOf 方法返回的CompletableFuture 执行 join 操作是个不错的主意。这个方法对“最佳价格查询器”应用也是有用的,因为你的用户可能会困?是否后面还有一些价格没有返回,使用这个方法,你可以在执行完毕之后打印输出一条消息“All shops returned results or timed out”。
然而在另一些场景中,你可能希望只要CompletableFuture对象数组中有任何一个执行完毕就不再等待。这时候,你可以使用一个类似的工厂方法anyof。该方法接收一个CompletableFuture对象构成的数组,返回由第一个执行完毕的CompletableFuture对象对方值构成的CompletableFuture。
小结
这一章中,你学到内容如下:
- 执行比较?时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改?程序的性能,加快程序的响应速度。
- 你应该尽可能地为客户提供异步API。使用 CompletableFuture 类提供的特性,你能够轻松地实现这一目标。
- CompletableFuture 类还提供了异常管理的机制,让你有机会抛出/管理异步任务执行中发生的异常。
- 将同步API的调用封装到一个 CompletableFuture 中,你能够以异步的方式使用其结果。
- 如果异步任务之间相互独立,或者它们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个。
- 你可以为 CompletableFuture 注册一个回调函数(使用thenAccept方法),在 Future 执行完毕或者它们计算的结果可用时,针对性地执行一些程序。
- 你可以决定在什么时候结?程序的运行,是等待由 CompletableFuture 对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。
这篇关于CompletableFuture:组合式异步编程-Java 8的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!