java 8的CompletableFuture

JDK1.5开始提供了Future类,用来描述一个异步计算的结果。可以通过isDone()方法来判断操作是否执行完毕。你也可以调用get方法阻塞调用的线程,等待执行结束。或者使用cancel方法取消任务的执行。
比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> doubleFuture = executor.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
// 一个耗时的操作
return doSomethingLongComputation();
}
});

// do something else.

try {
Double result = doubleFuture.get(1, TimeUnit.SECONDS);
System.out.println("result->" + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}

可以看到,我们向线程池提交一个任务,线程池立即返回了一个Future。然后我们可以继续做自己的事情,不必等待任务执行完毕。在自己的事情做完后,调用Future的get方法等待任务执行完毕,并返回结果。调用get()即为等待任务执行完毕,不论多久;而它的带超时时间的重载方法则在指定的时间之内等待执行结果,避免无限期等待下去。实际使用,建议使用带超时时间的get方法。

不过Future接口有它的局限性。比如:

  • 将两个异步结束的结果合并为一个——这2个异步计算相互独立,同时第二个又依赖第一个的结果。
  • 等待Future集合的所有任务执行完毕。
  • 仅等待Future结合中最快结束的任务完成。(比如访问了2个提供相同服务的服务器)
  • 通过编程方式完成一个Future任务的执行——以手工设定异步执行结果。
  • 响应Future的完成事件——在Future的任务执行完毕时会受到通知,然后使用Future计算的结果进行下一步操作。

java 8提供的CompltableFuture继承Future,在Future的基础上实现了如上描述的功能。

以异步的方式执行任务

借助CompletableFuture的supplyAsync方法实现异步执行。

1
2
3
4
5
6
7
8
List<Shop> shops = ...;
String productName = ...;
// 使用supplyAsync以异步的方式计算商品价格
List<CompletableFuture<String>> futures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(productname))))
.collect(Collectors.toList());
// 调用join方法等待任务都执行完毕
List<String> results = futures.stream().map(f -> f.join()).collect(Collectors.toList());

上面异步方法和Stream的parallelStream都是默认使用的是通用线程池,线程数为处理器个数。你可以通过Runtime.getRuntime().availableProcessors();得到。比如我的电脑是4核,那么就是使用的是4个线程的线程池。如果商家过多会导致执行效率较低。

幸好,supplyAsync还有还有一个带Executor参数的版本,可以指定执行器。

1
2
3
4
5
6
7
8
9
10
// 指定创建的线程数最多100个,如果商家数目不多余100个,就创建商家数目的线程。
ExecutorService executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), (r) -> {
Thread thread = new Thread(r);
thread.setDaemon(true); // 使用守护线程,这种方式不会阻止程序的关闭
return thread;
});
List<CompletableFuture<String>> futures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(productname)), executor))
.collect(Collectors.toList());
List<String> results = futures.stream().map(f -> f.join()).collect(Collectors.toList());

线程池大小的估算
《java并发编程实战》中,指出线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
N(threads) = N(cpu) * U(cpu) * (1 + W/C)
其中:

  • N(cpu)是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到。
  • U(cpu)是期望的CPU利用率(值介于0和1之间)
  • W/C是等待时间与计算时间的比率
    在本例中,99%的时间都在等待商店的响应,所以估算出的W/C比率为100.这意味着你期望的CPU利用率是100%。你需要创建一个有400个线程的线程池。但是,在实际操作中,如果你创建的线程数比商店的数目更多,反而是一种浪费,因为线程池中有些线程根本没有用到。所以,基于这种考虑将线程池的数目与你要查询的商店的数目设定为一个值,这个每个商店都对应一个服务线程。不过,为了避免商店数目过多导致服务器超负荷而崩溃,你需要设置一个上限,比如这里的100个线程。

将2个异步执行的结果合并为一个

我们假定商店提供了查询商品价格的服务getPrice,它会返回商品的价格和折扣代码,另外还提供了查询扣后价的方法。
另外提供了Quote类用于将getPrice返回的结果进行封装。

商家提供的getPrice方法如下:

1
2
3
4
5
6
7
public String getPrice1(String product) {
double price = caculatePrice(product);

Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
// 返回结果包括商品名称、商品价格和折扣代码,是一个字符串,它们用冒号隔开;
return String.format("%s:%.2f:%s",name,price,code);
}

Quote类的主要方法如下:

1
2
3
4
5
6
7
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
Double price = Double.parseDouble(split[1]);
Discount.Code code = Discount.Code.valueOf(split[2]);
return new Quote(shopName,price,code);
}

折扣类代码大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Discount {
/**
* 折扣代码和折扣率
*/
public enum Code {
NONE(0),SILVER(5),GOLD(10),PLATINUM(15),DIAMOND(20);

private final int percentage;

Code(int percentage) {
this.percentage = percentage;
}
};

/**
* 申请折扣
* @param quote
* @return
*/
public static String applyDiscount(Quote quote) {
return String.format("%s price is %.2f",quote.getShopName(),Discount.apply(quote.getPrice(),quote.getCode()));
}

public static double apply(Double price,Code code) {
delay();
return price * (100 - code.percentage)/100;
}

private static void delay() {
try {
long time = 500 + new Random().nextInt(2000);
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

我们需要实现的逻辑是:
1.查询商品的价格,得到商品名称、价格、折扣代码;
2.将1中的结果进行封装,得到Quote;
3.利用Discount的applyDiscount方法计算折扣价。
可以看到,第1步可以异步执行,而第2步只是简单的封装,不涉及远程服务和I/O操作,所以没必要使用异步。第3步由于需要联系远程Discount服务,根据折扣代码得到折扣率,所以它也以异步的方式执行。
我们的代码大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), (r) -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
});
List<CompletableFuture<String>> futures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice1(product), executor)) // 返回Stream<Future<String>>
.map(future -> future.thenApply(Quote::parse)) // 返回Stream<Future<Quote>>
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(Collectors.toList());

List<String> results = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

thenApply有点类似于JavaScript中的回调函数,它在Future结果计算完成后,将结果作为参数回调这个函数。它不会阻塞线程。
所以这里的意思就是在拿到商店的商品名称、价格、折扣代码后,将其封装为一个Quote对象。

thenCompose用于连接2个CompletableFuture,它会将第一个Future的结果作为参数传递给第2个Future。所以这里将第2个map的结果quote作为参数传递给了第2个Future,用于计算折后价。

将2个Future合并起来,无论他们是否有依赖

如果你希望将2个完全不相干的CompletableFuture的结果整合,那么可以使用CompletableFuture的thenCombine方法。

比如,你可以以异步的方式查询某个商店商品的价格,同时你可以以异步的方式从远程汇率服务查询欧元和美元的汇率,然后将商品的价格乘以当时的汇率,得到以美元计价的商品价格。

1
2
3
4
5
6
Future<Double> future = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) // 异步查询商品价格
.thenCombine(
// 异步计算汇率
CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate //执行的合并操作
);

响应Future的完成事件

比如,我们要查询100个商店的某件商品的价格,可能某个商店的服务或者网络问题,导致查询耗时很久,如果按照之前的方式等待所有的任务执行完毕,那么给人的体验就不好。比如假定某个商店查询价格耗时100秒,那么用户可能看到页面上在这100秒内都是一片空白。其实,我们可以对CompletableFuture的complete做出响应,使用它的thenAccept方法可以在任务执行完毕收收到通知,方法的参数为Future执行的结果。这样只要有任务执行完毕,就会立即显示出来。
比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ExecutorService executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), (r) -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
});

long start = System.nanoTime();
CompletableFuture[] futures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice1(product),executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),executor)))
.map(future -> future.thenAccept(s -> { // 这里一旦Future执行完毕,立即会调用该方法
System.out.println(s + " (done in " + (System.nanoTime() - start ) / 1_000_000 + "msecs.)");
}))
.toArray(length -> new CompletableFuture[length]);

CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in " + (System.nanoTime() - start ) / 1_000_000 + "msecs.");

结果如下:

1
2
3
4
5
6
7
JustByIt price is 135.63 (done in 1636msecs.)
BestPrice price is 72.03 (done in 2009msecs.)
BuyItAll price is 126.38 (done in 2442msecs.)
LetsSaveBig price is 139.40 (done in 3069msecs.)
MyFavoriteShop price is 115.09 (done in 3167msecs.)
All shops have now responded in 3168msecs.
Done in 3263 msecs

注意:这里用到了CompletableFuture的allOf方法,这里在所有任务执行完毕后,输出了一句话,表示任务完成。

仅等待Future结合中最快结束的任务完成

上面的例子使用了allOf方法,表示等待一组Future执行完毕。如果只是希望某个任务执行完毕即结束,那么可以使用anyOf方法。

通过编程方式完成一个Future任务的执行

假定在调用一个方法时,希望以异步的方式执行,我们在方法中使用一个线程来处理计算,然后立即返回了一个Future。那么我们可以计算完毕后,使用CompletableFuture的complete方法设置任务执行结果。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
// 异步方法
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> future = new CompletableFuture<>();
new Thread(){
@Override
public void run() {
double price = caculatePrice(product);
future.complete(price);
}
}.start();
return future;
}

CompletableFuture与异常处理

在上面的代码中,如果执行caculatePrice方法出现异常,会导致调用CompletableFuture的get方法永远等待下去。异常被限制在计算商品价格的线程范围,最后会杀死该线程,所以调用Future的get会一直等待。

当然,你可以使用带超时参数的get的重载方法来避免无限期等待,最终抛出一个TimeoutException,但这样你并不知道执行任务的线程到底出现了什么问题。

为了解决这个问题,你可以使用CompletableFuture的completeExceptionally将异常抛出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> future = new CompletableFuture<>();
new Thread(){
@Override
public void run() {
double price = caculatePrice(product);
try {
// 这里对0做除法并不会抛出异常,在不在线程中使用会抛出异常
//System.out.println(price / 0);
// 数组越界会抛出异常
int[] arr = {};
System.out.println(arr[1]);
future.complete(price);
} catch (Exception e) {
future.completeExceptionally(e);
}
}
}.start();
return future;
}

比如,上面我们认为模拟了一个数组下标越界的异常。调用future的get方法的客户端会立即得到一个异常,get方法就不会无限期等待了。另外,调用get方法时,始终建议使用带超时参数的重载方法,避免无限等待。

另外,注意:CompletableFuture的所有异步方法都使用了同样的错误管理机制,你不用再花大力气去处理异常了。

参考《java8实战》

Donny wechat
欢迎关注我的个人公众号
打赏,是超越赞的一种表达。
Show comments from Gitment