资讯专栏INFORMATION COLUMN

《java 8 实战》读书笔记 -第十一章 CompletableFuture:组合式异步编程

zhangqh / 391人阅读

摘要:方法接受一个生产者作为参数,返回一个对象,该对象完成异步执行后会读取调用生产者方法的返回值。该方法接收一个对象构成的数组,返回由第一个执行完毕的对象的返回值构成的。

一、Future 接口

在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。打个比方,你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你什么时候你的衣服会洗好(这就是一个Future事件)。衣服干洗的同时,你可以去做其他的事情。Future的另一个优点是它比更底层的Thread更易用。要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。

ExecutorService executor = Executors.newCachedThreadPool(); 
Future future = executor.submit(new Callable() { 
 public Double call() {
 return doSomeLongComputation(); 
 }}); 
doSomethingElse(); //异步操作进行的同时,你可以做其他的事情
try { 
 Double result = future.get(1, TimeUnit.SECONDS); //获取异步操作的结果,如果最终被阻塞,无法得到结
//果,那么在最多等待1秒钟之后退出
} catch (ExecutionException ee) {
 // 计算抛出一个异常
} catch (InterruptedException ie) { 
 // 当前线程在等待过程中被中断
} catch (TimeoutException te) { 
 // 在Future对象完成之前超过已过期
}
同步API与异步API 

同步API其实只是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。

与此相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是通过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。这种方式的计算在I/O系统程序设计中非常常见:你发起了一次磁盘访问,这次访问和你的其他计算操作是异步的,你完成其他的任务时,磁盘块的数据可能还没载入到内存,你只需要等待数据的载入完成。

二、实现异步 API

使用CompletableFuture后,getPriceAsync方法的实现

public Future getPriceAsync(String product) {
 CompletableFuture futurePrice = new CompletableFuture<>(); 
 new Thread( () -> { 
 double price = calculatePrice(product); //calculatePrice需长时间计算,任务结束并得出结果时设置
//Future的返回值
 futurePrice.complete(price); 
 }).start(); 
 return futurePrice; //无需等待还没结束的计算,直接返回Future对象
}

使用异步API:

Shop shop = new Shop("BestShop"); 
long start = System.nanoTime(); 
Future futurePrice = shop.getPriceAsync("my favorite product"); 
long invocationTime = ((System.nanoTime() - start) / 1_000_000); 
System.out.println("Invocation returned after " + invocationTime 
 + " msecs"); 
// 执行更多任务,比如查询其他商店
doSomethingElse(); 
// 在计算商品价格的同时
try { 
 double price = futurePrice.get(); //从Future对象中读取价格,如果价格未知,会发生阻塞
 System.out.printf("Price is %.2f%n", price); 
} catch (Exception e) { 
 throw new RuntimeException(e);
} 
long retrievalTime = ((System.nanoTime() - start) / 1_000_000); 
System.out.println("Price returned after " + retrievalTime + " msecs");
Stream和CompletableFuture的设计都遵循了类似的模式:它们都使用了Lambda表达式以及流水线的思想。CompletableFuture和Future的关系就跟Stream和Collection的关系一样。

错误处理
如果计算商品价格的方法出现异常,用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。为了避免这种情况,你需要使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。
抛出CompletableFuture内的异常:

public Future getPriceAsync( String product )
{
    CompletableFuture futurePrice = new CompletableFuture<>();
    new Thread( () - > {
                try {
                    double price = calculatePrice( product );
                    futurePrice.complete( price );
                } catch ( Exception ex ) {
                    futurePrice.completeExceptionally( ex );
                }
            } ).start();
    return(futurePrice);
}

使用工厂方法supplyAsync创建CompletableFuture对象:

public Future getPriceAsync(String product) { 
 return CompletableFuture.supplyAsync(() -> calculatePrice(product)); 
}

此处getPriceAsync方法返回的CompletableFuture对象和上面你手工创建和完成的CompletableFuture对象是完全等价的,这意味着它提供了同样的错误管理机制supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。

三、让你的代码免受阻塞之苦

在所有店铺中找出同一商品的价格,使用CompletableFuture实现findPrices方法

public List findPrices(String product) { 
 List> priceFutures = 
 shops.stream() 
 .map(shop -> CompletableFuture.supplyAsync( 
 () -> shop.getName() + " price is " +
 shop.getPrice(product))) 
 .collect(Collectors.toList()); 
 return priceFutures.stream() 
 .map(CompletableFuture::join) 
 .collect(toList()); 
}

这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作——这其实是有缘由的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作、通知join方法返回计算结果。

CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。使用它你不再需要使用try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。

使用定制的执行器:

调整线程池的大小
Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
❑NCPU是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到
❑UCPU是期望的CPU利用率(该值应该介于0和1之间)
❑W/C是等待时间与计算时间的比率

实际操作中,如果你创建
的线程数比商店的数目更多,反而是一种浪费,因为这样做之后,你线程池中的有些线程根本没有机会被使用。出于这种考虑,我们建议你将执行器使用的线程数,与你需要查询的商店数目设定为同一个值,这样每个商店都应该对应一个服务线程。不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,你还是需要设置一个上限,比如100个线程。代码清单如下所示。为“最优价格查询器”应用定制的执行器:

private final Executor executor = 
 Executors.newFixedThreadPool(Math.min(shops.size(), 100), 
 new ThreadFactory() { 
 public Thread newThread(Runnable r) { 
 Thread t = new Thread(r); 
 t.setDaemon(true); 
 return t; 
 }
});
并行——使用流还是CompletableFutures?
目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
我们对使用这些API的建议如下。
❑如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
❑反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。
四、对多个异步任务进行流水线操作 1.thenCompose

使用CompletableFuture实现findPrices方法(获取商品折扣后价格):

public List findPrices(String product) {
    List> priceFutures =
     shops.stream() 
     .map(shop -> CompletableFuture.supplyAsync( 
     () -> shop.getPrice(product), executor))//getPrice耗时操作,获取商品的价格字符串,使用异步方式
     .map(future -> future.thenApply(Quote::parse)) //将价格字符串解析成Quote对象(包装了价格,折扣率等)
     .map(future -> future.thenCompose(quote -> 
     CompletableFuture.supplyAsync( 
     () -> Discount.applyDiscount(quote), executor))) //异步计算商品最终价格
     .collect(toList());
    return priceFutures.stream() 
     .map(CompletableFuture::join) //等待流中的所有Future执行完毕,并提取各自的返回值
     .collect(toList());
}
thenapply()是返回的是非CompletableFuture类型:它的功能相当于将CompletableFuture转换成CompletableFuture
thenCompose()用来连接两个CompletableFuture,返回值是新的CompletableFuture:
thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。
CompletableFuture类中的其他方法一样,也提供了一个以Async后缀结尾的版本thenComposeAsync。通常而言,名称中不带Async
的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。
2.用thenCombine将两个 CompletableFuture 对象整合起来,无论它们是否存在依赖

thenCombine方法,它接收名为BiFunction的第二参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样,thenCombine方法也提供有一个Async的版本。这里,如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行

eg:有一家商店提供的价格是以欧元(EUR)计价的,但是你希望以美元的方式提供给你的客户:

Future futurePriceInUSD = 
 CompletableFuture.supplyAsync(() -> shop.getPrice(product)) 
 .thenCombine( 
 CompletableFuture.supplyAsync( 
 () -> exchangeService.getRate(Money.EUR, Money.USD)), 
 (price, rate) -> price * rate 
 );
五、响应 CompletableFuture 的 completion 事件

只要有商店返回商品价格就在第一时间显示返回值,不再等待那些还未返回的商店(有些甚至会发生超时)。Java 8的CompletableFuture通 过thenAccept方法提供了这一功能,它接收CompletableFuture执行完毕后的返回值做参数。

重构findPrices方法返回一个由Future构成的流

public Stream> findPricesStream(String product) { 
 return shops.stream() 
 .map(shop -> CompletableFuture.supplyAsync( 
 () -> shop.getPrice(product), executor)) 
 .map(future -> future.thenApply(Quote::parse)) 
 .map(future -> future.thenCompose(quote -> 
 CompletableFuture.supplyAsync( 
 () -> Discount.applyDiscount(quote), executor))); 
}

findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));

由 于thenAccept方法已经定义了如何处理CompletableFuture返回的结果,一旦CompletableFuture计算得到结果,它就返回一个CompletableFuture。对这个>对象,你能做的事非常有限,只能等待其运行结束。

你还希望能给最慢的商店一些机会,让它有机会打印输出返回的价格。为了实现这一目的,你可以把构成Stream的所有CompletableFuture对象放到一个数组中,等待所有的任务执行完成,代码如下所示:

CompletableFuture[] futures = findPricesStream("myPhone") 
 .map(f -> f.thenAccept(System.out::println)) 
 .toArray(size -> new CompletableFuture[size]); 
CompletableFuture.allOf(futures).join();

allOf工厂方法接收一个由CompletableFuture构成的数组,数组中的所有CompletableFuture对象执行完成之后,它返回一个CompletableFuture对象。这意味着,如果你需要等待最初Stream中的所有 CompletableFuture对象执行完毕,对 allOf方法返回的
CompletableFuture执行join操作是个不错的主意。

你可能希望只要CompletableFuture对象数组中有任何一个执行完毕就不再等待,比如,你正在查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在这种情况下,你可以使用一个类似的工厂方法anyOf。该方法接收一个CompletableFuture对象构成的数组,返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74370.html

相关文章

  • Java 8原生API也可以开发响应式代码?

    摘要:中使用了提供的原生接口对自身的异步化做了改进。可以支持和两种调用方式。实战通过下面的例子,可以看出的最大好处特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段时间工作上比较忙,这篇文章一直没来得及写,本文是阅读《Java8实战》的时候,了解到Java 8里已经提供了一个异步...

    HtmlCssJs 评论0 收藏0
  • Java8CompletableFuture进阶之道

    摘要:方法接收的是的实例,但是它没有返回值方法是函数式接口,无参数,会返回一个结果这两个方法是的升级,表示让任务在指定的线程池中执行,不指定的话,通常任务是在线程池中执行的。该的接口是在线程使用旧的接口,它不允许返回值。 简介 作为Java 8 Concurrency API改进而引入,本文是CompletableFuture类的功能和用例的介绍。同时在Java 9 也有对Completab...

    SunZhaopeng 评论0 收藏0
  • Java 8 CompletableFuture 教程

    摘要:在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。如果我们不想等待结果返回,我们可以把需要等待完成执行的逻辑写入到回调函数中。任何立即执行完成那就是执行在主线程中尝试删除测试下。可以使用达成目的。 Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等。在本篇文章中我将详细解释清楚Compl...

    since1986 评论0 收藏0
  • java并发编程学习14--CompletableFuture(一)

    摘要:并行流与目前,我们对集合进行计算有两种方式并行流而更加的灵活,我们可以配置线程池的大小确保整体的计算不会因为等待而发生阻塞。 【回顾Future接口 Future接口时java5引入的,设计初衷是对将来某个时刻会发生的结果建模。它建模了一种异步计算,返回了一个执行预算结果的引用。比如,你去干洗店洗衣服,店员会告诉你什么时候可以来取衣服,而不是让你一直在干洗店等待。要使用Future只需...

    VioletJack 评论0 收藏0
  • 高并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    supernavy 评论0 收藏0
  • 高并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    ddongjian0000 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<