摘要:这个方法返回与等待所有返回等待多个返回取多个当中最快的一个返回等待多个当中最快的一个返回二详解终极指南并发编程中的风格
thenApply(等待并转化future)
@Test
public void testThen() throws ExecutionException, InterruptedException {
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
return "zero";
}, executor);
CompletableFuture f2 = f1.thenApply(new Function() {
@Override
public Integer apply(String t) {
System.out.println(2);
return Integer.valueOf(t.length());
}
});
CompletableFuture f3 = f2.thenApply(r -> r * 2.0);
System.out.println(f3.get());
}
thenAccept与thenRun(监听future完成)
/**
* future完成处理,可获取结果
*/
@Test
public void testThenAccept(){
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
return "zero";
}, executor);
f1.thenAccept(e -> {
System.out.println("get result:"+e);
});
}
/**
* future完成处理
*/
@Test
public void testThenRun(){
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
return "zero";
}, executor);
f1.thenRun(new Runnable() {
@Override
public void run() {
System.out.println("finished");
}
});
}
thenCompose(flatMap future)
/**
* compose相当于flatMap,避免CompletableFuture>这种
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testThenCompose() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
return "zero";
}, executor);
CompletableFuture> f4 = f1.thenApply(CompletableFutureTest::calculate);
System.out.println("f4.get:"+f4.get().get());
CompletableFuture f5 = f1.thenCompose(CompletableFutureTest::calculate);
System.out.println("f5.get:"+f5.get());
System.out.println(f1.get());
}
public static CompletableFuture calculate(String input) {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println(input);
return input + "---" + input.length();
}, executor);
return future;
}
thenCombine与thenAcceptBoth
thenCombine(组合两个future,有返回值)
/**
* thenCombine用于组合两个并发的任务,产生新的future有返回值
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testThenCombine() throws ExecutionException, InterruptedException {
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("f1 start to sleep at:"+System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("f1 finish sleep at:"+System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "zero";
}, executor);
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("f2 start to sleep at:"+System.currentTimeMillis());
Thread.sleep(3000);
System.out.println("f2 finish sleep at:"+System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}, executor);
CompletableFuture reslutFuture =
f1.thenCombine(f2, new BiFunction() {
@Override
public String apply(String t, String u) {
System.out.println("f3 start to combine at:"+System.currentTimeMillis());
return t.concat(u);
}
});
System.out.println(reslutFuture.get());//zerohello
System.out.println("finish combine at:"+System.currentTimeMillis());
}
thenAcceptBoth(组合两个future,没有返回值)
/**
* thenAcceptBoth用于组合两个并发的任务,产生新的future没有返回值
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testThenAcceptBoth() throws ExecutionException, InterruptedException {
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("f1 start to sleep at:"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
System.out.println("f1 stop sleep at:"+System.currentTimeMillis());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "zero";
}, executor);
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("f2 start to sleep at:"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(3);
System.out.println("f2 stop sleep at:"+System.currentTimeMillis());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "hello";
}, executor);
CompletableFuture reslutFuture = f1.thenAcceptBoth(f2, new BiConsumer() {
@Override
public void accept(String t, String u) {
System.out.println("f3 start to accept at:"+System.currentTimeMillis());
System.out.println(t + " over");
System.out.println(u + " over");
}
});
System.out.println(reslutFuture.get());
System.out.println("finish accept at:"+System.currentTimeMillis());
}
applyToEither与acceptEither
applyToEither(取2个future中最先返回的,有返回值)
/**
* 当任意一个CompletionStage 完成的时候,fn 会被执行,它的返回值会当做新的CompletableFuture的计算结果
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testApplyToEither() throws ExecutionException, InterruptedException {
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("f1 start to sleep at:"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(5);
System.out.println("f1 stop sleep at:"+System.currentTimeMillis());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "fromF1";
}, executor);
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("f2 start to sleep at:"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(2);
System.out.println("f2 stop sleep at:"+System.currentTimeMillis());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "fromF2";
}, executor);
CompletableFuture reslutFuture = f1.applyToEither(f2,i -> i.toString());
System.out.println(reslutFuture.get()); //should not be null , wait for complete
}
acceptEither(取2个future中最先返回的,无返回值)
/**
* 取其中返回最快的一个
* 当任意一个CompletionStage 完成的时候,action 这个消费者就会被执行。这个方法返回 CompletableFuture
*/
@Test
public void testAcceptEither() throws ExecutionException, InterruptedException {
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("f1 start to sleep at:"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(3);
System.out.println("f1 stop sleep at:"+System.currentTimeMillis());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "zero";
}, executor);
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("f2 start to sleep at:"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(5);
System.out.println("f2 stop sleep at:"+System.currentTimeMillis());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "hello";
}, executor);
CompletableFuture reslutFuture = f1.acceptEither(f2,r -> {
System.out.println("quicker result:"+r);
});
reslutFuture.get(); //should be null , wait for complete
}
allOf与anyOf
allOf(等待所有future返回)
/**
* 等待多个future返回
*/
@Test
public void testAllOf() throws InterruptedException {
List> futures = IntStream.range(1,10)
.mapToObj(i ->
longCost(i)).collect(Collectors.toList());
final CompletableFuture allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
allCompleted.thenRun(() -> {
futures.stream().forEach(future -> {
try {
System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
});
Thread.sleep(100000); //wait
}
anyOf(取多个future当中最快的一个返回)
/**
* 等待多个future当中最快的一个返回
* @throws InterruptedException
*/
@Test
public void testAnyOf() throws InterruptedException {
List> futures = IntStream.range(1,10)
.mapToObj(i ->
longCost(i)).collect(Collectors.toList());
final CompletableFuture
doc
CompletableFuture(二)
Java CompletableFuture 详解
Java 8:CompletableFuture终极指南
Java 8: Definitive guide to CompletableFuture
并发编程 | JDK 1.8中的CompletableFuture | FRP风格
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/66708.html
摘要:方法接收的是的实例,但是它没有返回值方法是函数式接口,无参数,会返回一个结果这两个方法是的升级,表示让任务在指定的线程池中执行,不指定的话,通常任务是在线程池中执行的。该的接口是在线程使用旧的接口,它不允许返回值。 简介 作为Java 8 Concurrency API改进而引入,本文是CompletableFuture类的功能和用例的介绍。同时在Java 9 也有对Completab...
摘要:组合式异步编程最近这些年,两种趋势不断地推动我们反思我们设计软件的方式。第章中介绍的分支合并框架以及并行流是实现并行处理的宝贵工具它们将一个操作切分为多个子操作,在多个不同的核甚至是机器上并行地执行这些子操作。 CompletableFuture:组合式异步编程 最近这些年,两种趋势不断地推动我们反思我们设计软件的方式。第一种趋势和应用运行的硬件平台相关,第二种趋势与应用程序的架构相关...
摘要:首先想到的是开启一个新的线程去做某项工作。再进一步,为了让新线程可以返回一个值,告诉主线程事情做完了,于是乎粉墨登场。然而提供的方式是主线程主动问询新线程,要是有个回调函数就爽了。极大的提高效率。 showImg(https://segmentfault.com/img/bVbvgBJ?w=1920&h=1200); 引子 为了让程序更加高效,让CPU最大效率的工作,我们会采用异步编程...
摘要:内部类,用于对和异常进行包装,从而保证对进行只有一次成功。是取消异常,转换后抛出。判断是否使用的线程池,在中持有该线程池的引用。 前言 近期作者对响应式编程越发感兴趣,在内部分享JAVA9-12新特性过程中,有两处特性让作者深感兴趣:1.JAVA9中的JEP266对并发编程工具的更新,包含发布订阅框架Flow和CompletableFuture加强,其中发布订阅框架以java.base...
摘要:项目需求项目中需要优化一个接口,这个接口需要拉取个第三方接口,需求延迟时间小于技术选型是提出的一个支持非阻塞的多功能的,同样也是实现了接口,是添加的类,用来描述一个异步计算的结果。对进一步完善,扩展了诸多功能形成了。 项目需求: 项目中需要优化一个接口,这个接口需要拉取23个第三方接口,需求延迟时间小于200ms; 技术选型: CompletableFuture是JDK8提出的一个支持...
阅读 3088·2021-11-16 11:44
阅读 939·2019-08-30 15:55
阅读 3545·2019-08-30 15:52
阅读 3923·2019-08-30 15:43
阅读 2461·2019-08-30 11:21
阅读 667·2019-08-29 12:18
阅读 2187·2019-08-26 18:15
阅读 724·2019-08-26 10:32