资讯专栏INFORMATION COLUMN

Java多线程(2):使用线程池 ThreadPoolExecutor

darry / 1290人阅读

摘要:本文只介绍中线程池的基本使用,不会过多的涉及到线程池的原理。可缓存线程的线程池创建一个可缓存线程的线程池。首先是从接口继承到的方法使用该方法即将一个任务交给线程池去执行。方法方法的作用是向线程池发送关闭的指令。

首先,我们为什么需要线程池
让我们先来了解下什么是 对象池 技术。某些对象(比如线程,数据库连接等),它们创建的代价是非常大的 —— 相比于一般对象,它们创建消耗的时间和内存都很大(而且这些对象销毁的代价比一般对象也大)。所以,如果我们维护一个 ,每次使用完这些对象之后,并不销毁它,而是将其放入池中,下次需要使用时就直接从池中取出,便可以避免这些对象的重复创建;同时,我们可以固定 池的大小,比如设置池的大小为 N —— 即池中只保留 N 个这类对象 —— 当池中的 N 个对象都在使用中的时候,为超出数量的请求设置一种策略,比如 排队等候 或者 直接拒绝请求 等,从而避免频繁的创建此类对象。
线程池 即对象池的一种(池中的对象为线程 Thread),类似的还有 数据库连接池(池中对象为数据库连接 Connection)。合理利用线程池能够带来三个好处(参考文末的 References[1]):

降低资源消耗,通过重复利用已创建的线程,降低线程创建和销毁时造成的时间和内存上的消耗;

提升响应速度,当任务到达时,直接使用线程池中的线程来运行任务,使得任务可以不需要等到线程创建就能立即执行;

提高线程的可管理性,线程是开销很大的对象,如果无限制的创建线程,不仅会快速消耗系统资源,还会降低系统的稳定性;而使用线程池可以对线程进行统一的分配和调控。

本文只介绍 Java 中线程池的基本使用,不会过多的涉及到线程池的原理。如果有兴趣的读者需要深入理解线程池的实现原理,可以参考文末的 References

JDK 中线程池的基础架构如下:

执行器 Executor 是顶级接口,只包含了一个 execute 方法,用来执行一个 Runnable 任务:

执行器服务 ExecutorService 接口继承了 Executor 接口,ExecutorService 是所有线程池的基础接口,它定义了 JDK 中线程池应该实现的基本方法:

线程池执行器 ThreadPoolExecutor 是基础线程池的核心实现,并且可以通过定制 ThreadPoolExecutor 的构造参数或者继承 ThreadPoolExecutor,实现自己的线程池;

ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,是能执行周期性任务或定时任务的线程池;

ForkJoinPool 是 JDK1.7 时添加的类,作为对 Fork/Join 型线程池的实现。

本文只介绍 ThreadPoolExecutor 线程池的使用,ScheduledThreadPoolExecutorForkJoinPool 会在之后的文章中介绍。

查看 ThreadPoolExecutor 的源码可知,在 ThreadPoolExecutor 的内部,将每个池中的线程包装为了一个 Worker


然后在 ThreadPoolExecutor 中定义了一个 HashSet,作为 “池”

设置一个合适的线程池(即自定义 ThreadPoolExecutor)是比较麻烦的,因此 JDK 通过 Executors 这个工厂类为我们提供了一些预先定义好的线程池:

1、固定大小的线程池

创建一个包含 nThreads 个工作线程的线程池,这 nThreads 个线程共享一个无界队列(即不限制大小的队列);当新任务提交到线程池时,如果当前没有空闲线程,那么任务将放入队列中进行等待,直到有空闲的线程来从队列中取出该任务并运行。

(通过 Runtime.getRuntime().availableProcessors() 可以获得当前机器可用的处理器个数,对于计算密集型的任务,固定大小的线程池的 nThreads 设置为这个值时,一般能获得最大的 CPU 使用率)

2、单线程线程池

创建一个只包含一个工作线程的线程池,它的功能可以简单的理解为 即 newFixedThreadPool 方法传入参数为 1 的情况。但是与 newFixedThreadPool(1) 不同的是,如果线程池中这个唯一的线程意外终止,线程池会创建一个新线程继续执行之后的任务。

3、可缓存线程的线程池

创建一个可缓存线程的线程池。当新任务提交到线程池时,如果当前线程池中有空闲线程可用,则使用空闲线程来运行任务,否则新建一个线程来运行该任务,并将该线程添加到线程池中;而且该线程池会终止并移除那些超过 60 秒未被使用的空闲线程。所以这个线程池表现得就像缓存,缓存的资源为线程,缓存的超时时间为 60 秒。根据 JDK 的文档,当任务的运行时间都较短的时候,该线程池有利于提高性能。

我们看到每个构造线程池的工厂方法都有一个带 ThreadFactory 的重载形式。ThreadFactory 即线程池用来新建线程的工厂,每次线程池需要新建一个线程时,调用的就是这个 ThreadFactorynewThread 方法:

(如果不提供自定义的 ThreadFactory,那么使用的就是 DefaultThreadFactory —— Executors 内定义的内部类)
比如我们要为线程池中的每个线程提供一个特定的名字,那么我们就可以自定义 ThreadFactory 并重写其 newThread 方法:

public class SimpleThreadFactory implements ThreadFactory {

    private AtomicInteger id = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("Test_Thread-" + id.getAndIncrement());
        return thread;
    }

}

通过 JDK 的源码我们可以知道,以上三种线程池的实现都是基于 ThreadPoolExecutor

下面我们来看一下线程池的基础接口 ExecutorService 中每个方法的含义。
首先是从 Executor 接口继承到的 execute 方法:

使用该方法即将一个 Runnable 任务交给线程池去执行。

submit 方法:

submit 方法会提交一个任务去给线程池执行,该任务可以是带返回结果的 Callable 任务,也可以是一开始就指定结果的 Runnable 任务,或者不带结果的 Runnable 任务(此时即一开始指定结果为 null)。submit 方法会返回一个与所提交任务相关联的 Future。通过 上一篇文章 我们可以知道,Futureget 方法可以等待任务执行完毕并返回结果。所以通过 Future,我们可以与已经提交到线程池的任务进行交互。submit 提交任务及任务运行过程大致如下:

向线程池提交一个 Runnable 或者 Callable 任务;

将 任务 作为参数使用 newTaskFor 方法构造出 FutureTask

(由 上一篇文章 可知,FutureTask 实现了 RunnableFuture 两个接口,从而 FutureTask 可以作为 Runnable 交给 WorkerThread)去运行,也可以作为一个 Future 与任务交互)

![newTaskFor 方法][19]

线程池使用 execute 方法将 FutureTask 交给当前的 Worker 去运行,并将 FutureTaskFuture 返回;

然后 Worker 执行任务(即运行 run 方法),在任务完成后,为 FutureFutureTask) 设置结果 —— 设置结果之前,调用 Futureget 方法会让调用线程处于阻塞状态;

通过 Futureget 方法,获得任务的结果。

invokeAll 方法:

invokeAll 方法可以一次执行多个任务,但它并不同等于多次调用 submit 方法。submit 方法是非阻塞的,每次调用 submit 方法提交任务到线程池之后,会立即返回与任务相关联的 Future,然后当前线程继续向后执行;

invokeAll 方法是阻塞的,只有当提交的多个任务都执行完毕之后,invokeAll 方法才会返回,执行结果会以List> 返回,该 List> 中的每个 Future 是和提交任务时的 Collection> 中的任务 Callable 一 一对应的。带 timeout 参数的 invokeAll 就是设置一个超时时间,如果超过这个时间 invokeAll 中提交的所有任务还有没全部执行完,那么没有执行完的任务会被取消(中断),之后同样以一个 List> 返回执行的结果。

invokeAny 方法:

invokeAny 方法也是阻塞的,与 invokeAll 方法的不同之处在于,当所提交的一组任务中的任何一个任务完成之后,invokeAny 方法便会返回(返回的结果便是那个已经完成的任务的返回值),而其他任务会被取消(中断)。

举一个 invokeAny 使用的例子:电脑有 C、D、E、F 四个盘,我们需要找一个文件,但是我们不知道这个文件位于哪个盘中,我们便可以使用 invokeAny,并提交四个任务(对应于四个线程)分别查找 C、D、E、F 四个盘,如果哪个线程找到了这个文件,那么此时 invokeAny 便停止阻塞并返回结果,同时取消其他任务。

shutdown 方法:

shutdown 方法的作用是向线程池发送关闭的指令。一旦在线程池上调用 shutdown 方法之后,线程池便不能再接受新的任务;如果此时还向线程池提交任务,那么将会抛出 RejectedExecutionException 异常。之后线程池不会立刻关闭,直到之前已经提交到线程池中的所有任务(包括正在运行的任务和在队列中等待的任务)都已经处理完成,才会关闭。

shutdownNow 方法:

shutdown 不同,shutdownNow 会立即关闭线程池 —— 当前在线程池中运行的任务会全部被取消,然后返回线程池中所有正在等待的任务。

(值得注意的是,我们 必须显式的关闭线程池,否则线程池不会自己关闭)

awaitTermination 方法:

awaitTermination 可以用来判断线程池是否已经关闭。调用 awaitTermination 之后,在 timeout 时间内,如果线程池没有关闭,则阻塞当前线程,否则返回 true;当超过 timeout 的时间后,若线程池已经关闭则返回 true,否则返回 false。该方法一般这样使用:

任务全部提交完毕之后,我们调用 shutdown 方法向线程池发送关闭的指令;

然后我们通过 awaitTermination 来检测到线程池是否已经关闭,可以得知线程池中所有的任务是否已经执行完毕;

线程池执行完已经提交的所有任务,并将自己关闭;

调用 awaitTermination 方法的线程停止阻塞,并返回 true

isShutdown() 方法,如果线程池已经调用 shutdown 或者 shutdownNow,则返回 true,否则返回 false

isTerminated() 方法,如果线程池已经调用 shutdown 并且线程池中所有的任务已经执行完毕,或者线程池调用了 shutdownNow,则返回 true,否则返回 false

通过以上介绍,我们已经了解了 ExecutorService 中所有方法的功能,现在让我们来实践 ExecutorService 的功能。

我们继续使用 上一篇文章 的两个例子中的任务,首先是任务类型为 Runnable 的情况:

import java.util.*;
import java.util.concurrent.*;

public class RunnableTest {

    public static void main(String[] args) throws Exception {
        System.out.println("使用线程池运行 Runnable 任务:");
        
        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建大小固定为 5 的线程池

        List tasks = new ArrayList<>(10);

        for (int i = 0; i < 10; i++) {
            AccumRunnable task = new AccumRunnable(i * 10 + 1, (i + 1) * 10);
            tasks.add(task);
            
            threadPool.execute(task); // 让线程池执行任务 task
        }
        threadPool.shutdown(); // 向线程池发送关闭的指令,等到已经提交的任务都执行完毕之后,线程池会关闭

        threadPool.awaitTermination(1, TimeUnit.HOURS); // 等待线程池关闭,等待的最大时间为 1 小时

        int total = 0;
        for (AccumRunnable task : tasks) {
            total += task.getResult(); // 调用在 AccumRunnable 定义的 getResult 方法获得返回的结果
        }

        System.out.println("Total: " + total);
    }

    static final class AccumRunnable implements Runnable {

        private final int begin;
        private final int end;

        private int result;

        public AccumRunnable(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }

        @Override
        public void run() {
            result = 0;
            try {
                for (int i = begin; i <= end; i++) {
                    result += i;
                    Thread.sleep(100);
                }
            } catch (InterruptedException ex) {
                ex.printStackTrace(System.err);
            }
            System.out.printf("(%s) - 运行结束,结果为 %d
",
                    Thread.currentThread().getName(), result);
        }

        public int getResult() {
            return result;
        }
    }
}

运行结果:

可以看到 NetBeans 给出的运行时间为 2 秒 —— 因为每个任务需要 1 秒的时间,而线程池中的线程个数固定为 5 个,所以线程池最多同时处理 5 个任务,因而 10 个任务总共需要 2 秒的运行时间。

任务类型为 Callable

import java.util.*;
import java.util.concurrent.*;

public class CallableTest {

    public static void main(String[] args) throws Exception {
        System.out.println("使用线程池运行 Callable 任务:");
        
        ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建大小固定为 5 的线程池
        
        List> futures = new ArrayList<>(10);
        
        for (int i = 0; i < 10; i++) {
            AccumCallable task = new AccumCallable(i * 10 + 1, (i + 1) * 10);
            Future future = threadPool.submit(task); // 提交任务
            futures.add(future);
        }
        threadPool.shutdown(); // 向线程池发送关闭的指令,等到已经提交的任务都执行完毕之后,线程池会关闭

        int total = 0;
        for (Future future : futures) {
            total += future.get(); // 阻塞,直到任务结束,返回任务的结果
        }

        System.out.println("Total: " + total);
    }

    static final class AccumCallable implements Callable {

        private final int begin;
        private final int end;

        public AccumCallable(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }

        @Override
        public Integer call() throws Exception {
            int result = 0;
            for (int i = begin; i <= end; i++) {
                result += i;
                Thread.sleep(100);
            }
            System.out.printf("(%s) - 运行结束,结果为 %d
",
                    Thread.currentThread().getName(), result);

            return result;
        }

    }

}

运行结果:

改写上面的代码,使用 invokeAll 来直接执行一组任务:

public static void main(String[] args) throws Exception {
    System.out.println("使用线程池 invokeAll 运行一组 Callable 任务:");

    ExecutorService threadPool = Executors.newFixedThreadPool(5); // 创建大小固定为 5 的线程池

    List tasks = new ArrayList<>(10); // tasks 为一组任务
    for (int i = 0; i < 10; i++) {
        tasks.add(new AccumCallable(i * 10 + 1, (i + 1) * 10)); 
    }

    List> futures = threadPool.invokeAll(tasks); // 阻塞,直到所有任务都运行完毕

    int total = 0;
    for (Future future : futures) {
        total += future.get(); // 返回任务的结果
    }

    System.out.println("Total: " + total);

    threadPool.shutdown(); // 向线程池发送关闭的指令
}

运行结果:

线程池是很强大而且很方便的工具,它提供了对线程进行统一的分配和调控的各种功能。自 JDK1.5 时 JDK 添加了线程池的功能之后,一般情况下更推荐使用线程池来编写多线程程序,而不是直接使用 Thread

invokeAny 也是很实用的方法,请有兴趣的读者自己实践)

References:

http://www.infoq.com/cn/artic...

http://www.cnblogs.com/absfre...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66387.html

相关文章

  • Java线程学习(八)线程与Executor 框架

    摘要:一使用线程池的好处线程池提供了一种限制和管理资源包括执行一个任务。每个线程池还维护一些基本统计信息,例如已完成任务的数量。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。使用无界队列作为线程池的工作队列会对线程池带来的影响与相同。 历史优质文章推荐: Java并发编程指南专栏 分布式系统的经典基础理论 可能是最漂亮的Spring事务管理详解 面试中关于Java虚拟机(jvm)的问...

    cheng10 评论0 收藏0
  • (十七)java线程ThreadPoolExecutor

    摘要:本人邮箱欢迎转载转载请注明网址代码已经全部托管有需要的同学自行下载引言在之前的例子我们要创建多个线程处理一批任务的时候我是通过创建线程数组或者使用线程集合来管理的但是这样做不太好因为这些线程没有被重复利用所以这里要引入线程池今天我们就讲线程 本人邮箱: 欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kcogithub: https://github...

    wpw 评论0 收藏0
  • Java线程简单总结

    摘要:本文主要内容为简单总结中线程池的相关信息。方法簇方法簇用于创建固定线程数的线程池。三种常见线程池的对比上文总结了工具类创建常见线程池的方法,现对三种线程池区别进行比较。 概述 线程可认为是操作系统可调度的最小的程序执行序列,一般作为进程的组成部分,同一进程中多个线程可共享该进程的资源(如内存等)。在单核处理器架构下,操作系统一般使用分时的方式实现多线程;在多核处理器架构下,多个线程能够...

    CoorChice 评论0 收藏0
  • Java线程使用到阅读源码(3/10)

    摘要:最后,我们会通过对源代码的剖析深入了解线程池的运行过程和具体设计,真正达到知其然而知其所以然的水平。创建线程池既然线程池是一个类,那么最直接的使用方法一定是一个类的对象,例如。单线程线程池单线程线程 我们一般不会选择直接使用线程类Thread进行多线程编程,而是使用更方便的线程池来进行任务的调度和管理。线程池就像共享单车,我们只要在我们有需要的时候去获取就可以了。甚至可以说线程池更棒,...

    468122151 评论0 收藏0
  • Java 线程的认识和使用

    摘要:用于限定中线程数的最大值。该线程池中的任务队列维护着等待执行的对象。线程池和消息队列笔者在实际工程应用中,使用过多线程和消息队列处理过异步任务。以上是笔者在学习实践之后对于多线程和消息队列的粗浅认识,初学者切莫混淆两者的作用。 多线程编程很难,难点在于多线程代码的执行不是按照我们直觉上的执行顺序。所以多线程编程必须要建立起一个宏观的认识。 线程池是多线程编程中的一个重要概念。为了能够更...

    mgckid 评论0 收藏0

发表评论

0条评论

darry

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<