资讯专栏INFORMATION COLUMN

Java 8 CompletableFuture 教程

since1986 / 1472人阅读

摘要:在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。如果我们不想等待结果返回,我们可以把需要等待完成执行的逻辑写入到回调函数中。任何立即执行完成那就是执行在主线程中尝试删除测试下。可以使用达成目的。

Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等。在本篇文章中我将详细解释清楚CompletableFuture以及它所有方法的使用。

什么是CompletableFuture?

在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个多带带的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。

使用这种并行方式,可以极大的提高程序的性能。

Future vs CompletableFuture

CompletableFuture 是 Future API的扩展。

Future 被用于作为一个异步计算结果的引用。提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,get() 方法用来接收计算任务的结果。

从 Callbale和 Future 教程可以学习更多关于 Future 知识.

Future API 是非常好的 Java 异步编程进阶,但是它缺乏一些非常重要和有用的特性。

Future 的局限性

不能手动完成
当你写了一个函数,用于通过一个远程API获取一个电子商务产品最新价格。因为这个 API 太耗时,你把它允许在一个独立的线程中,并且从你的函数中返回一个 Future。现在假设这个API服务宕机了,这时你想通过该产品的最新缓存价格手工完成这个Future 。你会发现无法这样做。

Future 的结果在非阻塞的情况下,不能执行更进一步的操作
Future 不会通知你它已经完成了,它提供了一个阻塞的 get() 方法通知你结果。你无法给 Future 植入一个回调函数,当 Future 结果可用的时候,用该回调函数自动的调用 Future 的结果。

多个 Future 不能串联在一起组成链式调用
有时候你需要执行一个长时间运行的计算任务,并且当计算任务完成的时候,你需要把它的计算结果发送给另外一个长时间运行的计算任务等等。你会发现你无法使用 Future 创建这样的一个工作流。

不能组合多个 Future 的结果
假设你有10个不同的Future,你想并行的运行,然后在它们运行未完成后运行一些函数。你会发现你也无法使用 Future 这样做。

没有异常处理
Future API 没有任务的异常处理结构居然有如此多的限制,幸好我们有CompletableFuture,你可以使用 CompletableFuture 达到以上所有目的。

CompletableFuture 实现了 FutureCompletionStage接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。

创建 CompletableFuture

1. 简单的例子
可以使用如下无参构造函数简单的创建 CompletableFuture:

CompletableFuture completableFuture = new CompletableFuture();

这是一个最简单的 CompletableFuture,想获取CompletableFuture 的结果可以使用 CompletableFuture.get() 方法:

String result = completableFuture.get()

get() 方法会一直阻塞直到 Future 完成。因此,以上的调用将被永远阻塞,因为该Future一直不会完成。

你可以使用 CompletableFuture.complete() 手工的完成一个 Future:

completableFuture.complete("Future"s Result")

所有等待这个 Future 的客户端都将得到一个指定的结果,并且 completableFuture.complete() 之后的调用将被忽略。

2. 使用 runAsync() 运行异步计算
如果你想异步的运行一个后台任务并且不想改任务返回任务东西,这时候可以使用 CompletableFuture.runAsync()方法,它持有一个Runnable 对象,并返回 CompletableFuture

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I"ll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

你也可以以 lambda 表达式的形式传入 Runnable 对象:

// Using Lambda Expression
CompletableFuture future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I"ll run in a separate thread than the main thread.");
});

在本文中,我使用lambda表达式会比较频繁,如果以前你没有使用过,建议你也多使用lambda 表达式。

3. 使用 supplyAsync() 运行一个异步任务并且返回结果
当任务不需要返回任何东西的时候, CompletableFuture.runAsync() 非常有用。但是如果你的后台任务需要返回一些结果应该要怎么样?

CompletableFuture.supplyAsync() 就是你的选择。它持有supplier 并且返回CompletableFutureT 是通过调用 传入的supplier取得的值的类型。

// Run a task specified by a Supplier object asynchronously
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Supplier 是一个简单的函数式接口,表示supplier的结果。它有一个get()方法,该方法可以写入你的后台任务中,并且返回结果。

你可以使用lambda表达式使得上面的示例更加简明:

// Using Lambda Expression
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});
一个关于Executor 和Thread Pool笔记
你可能想知道,我们知道runAsync() supplyAsync()方法在多带带的线程中执行他们的任务。但是我们不会永远只创建一个线程。
CompletableFuture可以从全局的 ForkJoinPool.commonPool()获得一个线程中执行这些任务。
但是你也可以创建一个线程池并传给runAsync() supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务。
CompletableFuture API 的所有方法都有两个变体-一个接受Executor作为参数,另一个不这样:
// Variations of runAsync() and supplyAsync() methods
static CompletableFuture  runAsync(Runnable runnable)
static CompletableFuture  runAsync(Runnable runnable, Executor executor)
static  CompletableFuture supplyAsync(Supplier supplier)
static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)

创建一个线程池,并传递给其中一个方法:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);
在 CompletableFuture 转换和运行

CompletableFuture.get()方法是阻塞的。它会一直等到Future完成并且在完成后返回结果。
但是,这是我们想要的吗?对于构建异步系统,我们应该附上一个回调给CompletableFuture,当Future完成的时候,自动的获取结果。
如果我们不想等待结果返回,我们可以把需要等待Future完成执行的逻辑写入到回调函数中。

可以使用 thenApply(), thenAccept()thenRun()方法附上一个回调给CompletableFuture。

1. thenApply()
可以使用 thenApply() 处理和改变CompletableFuture的结果。持有一个Function作为参数。Function是一个简单的函数式接口,接受一个T类型的参数,产出一个R类型的结果。

// Create a CompletableFuture
CompletableFuture whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   return "Rajeev";
});

// Attach a callback to the Future using thenApply()
CompletableFuture greetingFuture = whatsYourNameFuture.thenApply(name -> {
   return "Hello " + name;
});

// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev

你也可以通过附加一系列的thenApply()在回调方法 在CompletableFuture写一个连续的转换。这样的话,结果中的一个 thenApply方法就会传递给该系列的另外一个 thenApply方法。

CompletableFuture welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog

2. thenAccept() 和 thenRun()
如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept() thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
CompletableFuture.thenAccept() 持有一个Consumer ,返回一个CompletableFuture。它可以访问CompletableFuture的结果:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

虽然thenAccept()可以访问CompletableFuture的结果,但thenRun()不能访Future的结果,它持有一个Runnable返回CompletableFuture

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});
异步回调方法的笔记
CompletableFuture提供的所有回调方法都有两个变体:
`// thenApply() variants
CompletableFuture thenApply(Function fn)
CompletableFuture thenApplyAsync(Function fn)
CompletableFuture thenApplyAsync(Function fn, Executor executor)`
这些异步回调变体通过在独立的线程中执行回调任务帮助你进一步执行并行计算。
以下示例:
CompletableFuture.supplyAsync(() -> {
    try {
       TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      throw new IllegalStateException(e);
    }
    return "Some Result"
}).thenApply(result -> {
    /* 
      Executed in the same thread where the supplyAsync() task is executed
      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    */
    return "Processed Result"
})

在以上示例中,在thenApply()中的任务和在supplyAsync()中的任务执行在相同的线程中。任何supplyAsync()立即执行完成,那就是执行在主线程中(尝试删除sleep测试下)。
为了控制执行回调任务的线程,你可以使用异步回调。如果你使用thenApplyAsync()回调,将从ForkJoinPool.commonPool()获取不同的线程执行。

CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})

此外,如果你传入一个ExecutorthenApplyAsync()回调中,,任务将从Executor线程池获取一个线程执行。

Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
    return "Some result"
}).thenApplyAsync(result -> {
    // Executed in a thread obtained from the executor
    return "Processed Result"
}, executor);
组合两个CompletableFuture

1. 使用 thenCompose() 组合两个独立的future
假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另外一个服务中获取他的贷方。
考虑下以下两个方法getUserDetail() getCreditRating()的实现:

CompletableFuture getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        UserService.getUserDetails(userId);
    });    
}

CompletableFuture getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        CreditRatingService.getCreditRating(user);
    });
}

现在让我们弄明白当使用了thenApply()后是否会达到我们期望的结果-

CompletableFuture> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

在更早的示例中,Supplier函数传入thenApply将返回一个简单的值,但是在本例中,将返回一个CompletableFuture。以上示例的最终结果是一个嵌套的CompletableFuture。
如果你想获取最终的结果给最顶层future,使用 thenCompose()方法代替-

CompletableFuture result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));

因此,规则就是-如果你的回调函数返回一个CompletableFuture,但是你想从CompletableFuture链中获取一个直接合并后的结果,这时候你可以使用thenCompose()

2. 使用thenCombine()组合两个独立的 future
虽然thenCompose()被用于当一个future依赖另外一个future的时候用来组合两个future。thenCombine()被用来当两个独立的Future都完成的时候,用来做一些事情。

System.out.println("Retrieving weight.");
CompletableFuture weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

当两个Future都完成的时候,传给``thenCombine()的回调函数将被调用。

组合多个CompletableFuture

我们使用thenCompose() thenCombine()把两个CompletableFuture组合在一起。现在如果你想组合任意数量的CompletableFuture,应该怎么做?我们可以使用以下两个方法组合任意数量的CompletableFuture。

static CompletableFuture allOf(CompletableFuture... cfs)
static CompletableFuture anyOf(CompletableFuture... cfs)

1. CompletableFuture.allOf()
CompletableFuture.allOf的使用场景是当你一个列表的独立future,并且你想在它们都完成后并行的做一些事情。

假设你想下载一个网站的100个不同的页面。你可以串行的做这个操作,但是这非常消耗时间。因此你想写一个函数,传入一个页面链接,返回一个CompletableFuture,异步的下载页面内容。

CompletableFuture downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page"s content
    });
} 

现在,当所有的页面已经下载完毕,你想计算包含关键字CompletableFuture页面的数量。可以使用CompletableFuture.allOf()达成目的。

List webPageLinks = Arrays.asList(...)    // A list of 100 web page links

// Download contents of all the web pages asynchronously
List> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

使用CompletableFuture.allOf()的问题是它返回CompletableFuture。但是我们可以通过写一些额外的代码来获取所有封装的CompletableFuture结果。

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

花一些时间理解下以上代码片段。当所有future完成的时候,我们调用了future.join(),因此我们不会在任何地方阻塞。

join()方法和get()方法非常类似,这唯一不同的地方是如果最顶层的CompletableFuture完成的时候发生了异常,它会抛出一个未经检查的异常。

现在让我们计算包含关键字页面的数量。

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

2. CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介绍的一样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture。以下示例:

CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,当三个中的任何一个CompletableFuture完成, anyOfFuture就会完成。因为future2的休眠时间最少,因此她最先完成,最终的结果将是future2的结果。

CompletableFuture.anyOf()传入一个Future可变参数,返回CompletableFutureCompletableFuture.anyOf()的问题是如果你的CompletableFuture返回的结果是不同类型的,这时候你讲会不知道你最终CompletableFuture是什么类型。

CompletableFuture 异常处理

我们探寻了怎样创建CompletableFuture,转换它们,并组合多个CompletableFuture。现在让我们弄明白当发生错误的时候我们应该怎么做。

首先让我们明白在一个回调链中错误是怎么传递的。思考下以下回调链:

CompletableFuture.supplyAsync(() -> {
    // Code which might throw an exception
    return "Some result";
}).thenApply(result -> {
    return "processed result";
}).thenApply(result -> {
    return "result after further processing";
}).thenAccept(result -> {
    // do something with the final result
});

如果在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,同样的,future将以异常结束。

1. 使用 exceptionally() 回调处理异常
exceptionally()回调给你一个从原始Future中生成的错误恢复的机会。你可以在这里记录这个异常并返回一个默认值。

Integer age = -1;

CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get()); 

2. 使用 handle() 方法处理异常
API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。

Integer age = -1;

CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

如果异常发生,res参数将是 null,否则,ex将是 null。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69149.html

相关文章

  • Java8CompletableFuture进阶之道

    摘要:方法接收的是的实例,但是它没有返回值方法是函数式接口,无参数,会返回一个结果这两个方法是的升级,表示让任务在指定的线程池中执行,不指定的话,通常任务是在线程池中执行的。该的接口是在线程使用旧的接口,它不允许返回值。 简介 作为Java 8 Concurrency API改进而引入,本文是CompletableFuture类的功能和用例的介绍。同时在Java 9 也有对Completab...

    SunZhaopeng 评论0 收藏0
  • Java 8原生API也可以开发响应式代码?

    摘要:中使用了提供的原生接口对自身的异步化做了改进。可以支持和两种调用方式。实战通过下面的例子,可以看出的最大好处特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段时间工作上比较忙,这篇文章一直没来得及写,本文是阅读《Java8实战》的时候,了解到Java 8里已经提供了一个异步...

    HtmlCssJs 评论0 收藏0
  • java 8 实战》读书笔记 -第十一章 CompletableFuture:组合式异步编程

    摘要:方法接受一个生产者作为参数,返回一个对象,该对象完成异步执行后会读取调用生产者方法的返回值。该方法接收一个对象构成的数组,返回由第一个执行完毕的对象的返回值构成的。 一、Future 接口 在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。打个比方,你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。...

    zhangqh 评论0 收藏0
  • 强大的CompletableFuture

    摘要:首先想到的是开启一个新的线程去做某项工作。再进一步,为了让新线程可以返回一个值,告诉主线程事情做完了,于是乎粉墨登场。然而提供的方式是主线程主动问询新线程,要是有个回调函数就爽了。极大的提高效率。 showImg(https://segmentfault.com/img/bVbvgBJ?w=1920&h=1200); 引子 为了让程序更加高效,让CPU最大效率的工作,我们会采用异步编程...

    skinner 评论0 收藏0
  • java8CompletableFuture使用实例

    摘要:这个方法返回与等待所有返回等待多个返回取多个当中最快的一个返回等待多个当中最快的一个返回二详解终极指南并发编程中的风格 thenApply(等待并转化future) @Test public void testThen() throws ExecutionException, InterruptedException { CompletableFutur...

    kycool 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<