资讯专栏INFORMATION COLUMN

CompletableFuture的执行线程

mo0n1andin / 2936人阅读

默认使用的线程池

不传executor时默认使用ForkJoinPool.commonPool()

IntStream.range(0, 15).parallel().forEach(i -> {
            System.out.println(Thread.currentThread());
        });

输出

Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]

commonPool

This pool is statically constructed; its run state is unaffected by attempts to shutdown() or shutdownNow(). However this pool and any ongoing processing are automatically terminated upon program System.exit(int). Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.

thenRun测试实例1 不设定executor
    @Test
    public void testRunOnCommonPool() throws InterruptedException {

        CompletionStage futurePrice = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("test1:1 - runAsync(runnable), job thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
                }
        );

        System.out.println("test1:flag1");

        futurePrice.thenRun(() -> {
            System.out.println("test1:2 - thenRun(runnable)), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        });

        System.out.println("test1:flag2");

        futurePrice.thenRunAsync(() -> {
            System.out.println("test1:3 - thenRunAsync(runnable), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        });

        TimeUnit.SECONDS.sleep(100);

    }

输出

test1:flag1
test1:flag2
test1:1 - runAsync(runnable), job thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:2 - thenRun(runnable)), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:3 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
设定executor
    @Test
    public void testRunOnExecutors() throws InterruptedException {
        CompletionStage futurePrice = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("test2:1 - runAsync(runnable, executor), job thread: " + Thread.currentThread());
            //Thread[pool-1-thread-1,5,main]
        }, executor);

        System.out.println("test2:flag1");

        futurePrice.thenRunAsync(() -> {
            System.out.println("test2:2 - thenRunAsync(runnable), action thread: " + Thread.currentThread());
            //Thread[pool-1-thread-1,5,main]
        });

        System.out.println("test2:flag2");

        futurePrice.thenRun(() -> {
            System.out.println("test2:3 - thenRun(runnable), action thread: " + Thread.currentThread());
            //Thread[pool-1-thread-2,5,main]
        });

        futurePrice.thenRunAsync(() -> {
            System.out.println("test2:4 - thenRunAsync(runnable, executor), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        }, executor);

        TimeUnit.SECONDS.sleep(100);
    }

输出

test2:flag1
test2:flag2
test2:1 - runAsync(runnable, executor), job thread: Thread[pool-1-thread-1,5,main]
test2:3 - thenRun(runnable), action thread: Thread[pool-1-thread-1,5,main]
test2:4 - thenRunAsync(runnable, executor), action thread: Thread[pool-1-thread-2,5,main]
test2:2 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
thenRun测试实例2 没有sleep
@Test
    public void testThenRun(){
        CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("f1 thread:"+Thread.currentThread().getName());
            return "zero";
        }, executor);
        f1.thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("then run thread:"+Thread.currentThread().getName());
                System.out.println("finished");
            }
        });
        TimeUnit.SECONDS.sleep(10);
    }

使用executor的输出

f1 thread:pool-1-thread-1
then run thread:main
finished

不使用executor的输出

f1 thread:ForkJoinPool.commonPool-worker-1
then run thread:main
finished
加上sleep
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("f1 thread:"+Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "zero";
        }, executor);
        f1.thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("then run thread:"+Thread.currentThread().getName());
                System.out.println("finished");
            }
        });
        TimeUnit.SECONDS.sleep(10);

使用executor的输出

f1 thread:pool-1-thread-1
then run thread:pool-1-thread-1
finished

不使用executor的输出

f1 thread:ForkJoinPool.commonPool-worker-1
then run thread:ForkJoinPool.commonPool-worker-1
finished
小结

不带 async 的 thenRun() 方法仍然是一个异步方法,可能是使用main线程,commonPool的线程或者是executor的线程。

doc

ForkJoinPool.commonPool

理解 CompletableFuture 的任务与回调函数的线程

哪个线程执行 CompletableFuture’s tasks 和 callbacks?

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66703.html

相关文章

  • 《java 8 实战》读书笔记 -第十一章 CompletableFuture:组合式异步编程

    摘要:方法接受一个生产者作为参数,返回一个对象,该对象完成异步执行后会读取调用生产者方法的返回值。该方法接收一个对象构成的数组,返回由第一个执行完毕的对象的返回值构成的。 一、Future 接口 在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。打个比方,你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。...

    zhangqh 评论0 收藏0
  • Java 8 CompletableFuture 教程

    摘要:在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。如果我们不想等待结果返回,我们可以把需要等待完成执行的逻辑写入到回调函数中。任何立即执行完成那就是执行在主线程中尝试删除测试下。可以使用达成目的。 Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等。在本篇文章中我将详细解释清楚Compl...

    since1986 评论0 收藏0
  • Java8CompletableFuture进阶之道

    摘要:方法接收的是的实例,但是它没有返回值方法是函数式接口,无参数,会返回一个结果这两个方法是的升级,表示让任务在指定的线程池中执行,不指定的话,通常任务是在线程池中执行的。该的接口是在线程使用旧的接口,它不允许返回值。 简介 作为Java 8 Concurrency API改进而引入,本文是CompletableFuture类的功能和用例的介绍。同时在Java 9 也有对Completab...

    SunZhaopeng 评论0 收藏0
  • java并发编程学习14--CompletableFuture(一)

    摘要:并行流与目前,我们对集合进行计算有两种方式并行流而更加的灵活,我们可以配置线程池的大小确保整体的计算不会因为等待而发生阻塞。 【回顾Future接口 Future接口时java5引入的,设计初衷是对将来某个时刻会发生的结果建模。它建模了一种异步计算,返回了一个执行预算结果的引用。比如,你去干洗店洗衣服,店员会告诉你什么时候可以来取衣服,而不是让你一直在干洗店等待。要使用Future只需...

    VioletJack 评论0 收藏0
  • 《Java8实战》-第十一章笔记(CompletableFuture:组合式异步编程)

    摘要:组合式异步编程最近这些年,两种趋势不断地推动我们反思我们设计软件的方式。第章中介绍的分支合并框架以及并行流是实现并行处理的宝贵工具它们将一个操作切分为多个子操作,在多个不同的核甚至是机器上并行地执行这些子操作。 CompletableFuture:组合式异步编程 最近这些年,两种趋势不断地推动我们反思我们设计软件的方式。第一种趋势和应用运行的硬件平台相关,第二种趋势与应用程序的架构相关...

    hlcfan 评论0 收藏0
  • java并发编程学习16--CompletableFuture(三)

    摘要:所以很容易出现某一个商店的数据迟迟无法返回的情况。工厂方法接受由对象构成的数组数组中所有的完成后它返回一个对象。异步的可以通过进行合并,无论他们之间是否有依赖关系。可以为注册一个回调函数,在执行完毕时使用。 【最佳价格查询器的优化 由于我们的两个远程服务:1.查询价格,2.查询折扣价格都是基于网络的。所以很容易出现某一个商店的数据迟迟无法返回的情况。由于这些原因,我希望查询器在查询时能...

    马忠志 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<