资讯专栏INFORMATION COLUMN

Java 线程池艺术探索

lolomaco / 1722人阅读

摘要:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。创建一个定长线程池,支持定时及周期性任务执行。

转载请注明原创地址为:http://www.54tianzhisheng.cn/...

线程池

Wiki 上是这样解释的:Thread Pool

作用:利用线程池可以大大减少在创建和销毁线程上所花的时间以及系统资源的开销!

下面主要讲下线程池中最重要的一个类 ThreadPoolExecutor 。

ThreadPoolExecutor

ThreadPoolExecutor 构造器:

有四个构造器的,挑了参数最长的一个进行讲解。

七个参数:

corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池最大线程数;

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止;

unit:参数keepAliveTime的时间单位(DAYS、HOURS、MINUTES、SECONDS 等);

workQueue:阻塞队列,用来存储等待执行的任务;

ArrayBlockingQueue (有界队列)

LinkedBlockingQueue (无界队列)

SynchronousQueue

threadFactory:线程工厂,主要用来创建线程

handler:拒绝处理任务的策略

AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。(默认这种)

DiscardPolicy:也是丢弃任务,但是不抛出异常

DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

CallerRunsPolicy:由调用线程处理该任务

重要方法:

execute():通过这个方法可以向线程池提交一个任务,交由线程池去执行;

shutdown():关闭线程池;

execute() 方法:

注:JDK 1.7 和 1.8 这个方法有点区别,下面代码是 1.8 中的。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
  //1、如果当前的线程数小于核心线程池的大小,根据现有的线程作为第一个 Worker 运行的线程,新建一个 Worker,addWorker 自动的检查当前线程池的状态和 Worker 的数量,防止线程池在不能添加线程的状态下添加线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
   //2、如果线程入队成功,然后还是要进行 double-check 的,因为线程在入队之后状态是可能会发生变化的
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
          // recheck 防止线程池状态的突变,如果突变,那么将 reject 线程,防止 workQueue 中增加新线程
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)//上下两个操作都有 addWorker 的操作,但是如果在workQueue.offer 的时候 Worker 变为 0,那么将没有 Worker 执行新的 task,所以增加一个 Worker.
                addWorker(null, false);
        }
   //3、如果 task 不能入队(队列满了),这时候尝试增加一个新线程,如果增加失败那么当前的线程池状态变化了或者线程池已经满了然后拒绝task
        else if (!addWorker(command, false))
            reject(command);
    }

其中调用了 addWorker() 方法:

private boolean addWorker(Runnable firstTask, boolean core) {// firstTask: 新增一个线程并执行这个任务,可空,增加的线程从队列获取任务;core:是否使用 corePoolSize 作为上限,否则使用 maxmunPoolSize
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
   /**
    * rs!=Shutdown || fistTask!=null || workQueue.isEmpty
    * 如果当前的线程池的状态 > SHUTDOWN 那么拒绝 Worker 的 add 如果 =SHUTDOWN
    * 那么此时不能新加入不为 null 的 Task,如果在 workQueue 为 empty 的时候不能加入任何类型的 Worker,
    * 如果不为 empty 可以加入 task 为 null 的 Worker, 增加消费的 Worker
    */
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
          //如果当前的数量超过了 CAPACITY,或者超过了 corePoolSize 和 maximumPoolSize(试 core 而定)
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
          //CAS 尝试增加线程数,如果失败,证明有竞争,那么重新到 retry。
                if (compareAndIncrementWorkerCount(c))// AtomicInteger 的 CAS 操作;
                    break retry;
                c = ctl.get();  // Re-read ctl
               //判断当前线程池的运行状态,状态发生改变,重试 retry;
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);// Worker 为内部类,封装了线程和任务,通过 ThreadFactory 创建线程,可能失败抛异常或者返回 null
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                         // SHUTDOWN 以后的状态和 SHUTDOWN 状态下 firstTask 为 null,不可新增线程
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;//记录最大线程数
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//失败回退,从 wokers 移除 w, 线程数减一,尝试结束线程池(调用tryTerminate 方法)
        }
        return workerStarted;
    }

示意图:

执行流程:

1、当有任务进入时,线程池创建线程去执行任务,直到核心线程数满为止

2、核心线程数量满了之后,任务就会进入一个缓冲的任务队列中

当任务队列为无界队列时,任务就会一直放入缓冲的任务队列中,不会和最大线程数量进行比较

当任务队列为有界队列时,任务先放入缓冲的任务队列中,当任务队列满了之后,才会将任务放入线程池,此时会与线程池中最大的线程数量进行比较,如果超出了,则默认会抛出异常。然后线程池才会执行任务,当任务执行完,又会将缓冲队列中的任务放入线程池中,然后重复此操作。

shutdown() 方法:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //判断是否可以操作目标线程
            checkShutdownAccess();
            //设置线程池状态为 SHUTDOWN, 此处之后,线程池中不会增加新 Task
            advanceRunState(SHUTDOWN);
            //中断所有的空闲线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //转到 Terminate
        tryTerminate();
    }

参考资料:深入理解java线程池—ThreadPoolExecutor

JDK 自带四种线程池分析与比较

1、newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

2、newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

3、newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

4、newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

四种线程池其实内部方法都是调用的 ThreadPoolExecutor 类,只不过利用了其不同的构造器方法而已(传入自己需要传入的参数),那么利用这个特性,我们自己也是可以实现自己定义的线程池的。

自定义线程池

1、创建任务类

package com.zhisheng.thread.threadpool.demo;
/**
 * Created by 10412 on 2017/7/24.
 * 任务
 */
public class MyTask implements Runnable
{
    private int taskId;     //任务 id
    private String taskName;    //任务名字
    public int getTaskId() {
        return taskId;
    }
    public void setTaskId(int taskId) {
        this.taskId = taskId;
    }
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public MyTask(int taskId, String taskName) {
        this.taskId = taskId;
        this.taskName = taskName;
    }
    @Override
    public void run() {
        System.out.println("当前正在执行 ******   线程Id-->" + taskId + ",任务名称-->" + taskName);
        try {
            Thread.currentThread().sleep(5 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程Id-->" + taskId + ",任务名称-->" + taskName + "   -----------   执行完毕!");
    }
}

2、自定义拒绝策略,实现 RejectedExecutionHandler 接口,重写 rejectedExecution 方法

package com.zhisheng.thread.threadpool.demo;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * Created by 10412 on 2017/7/24.
 * 自定义拒绝策略,实现 RejectedExecutionHandler 接口
 */
public class RejectedThreadPoolHandler implements RejectedExecutionHandler
{
    public RejectedThreadPoolHandler() {
    }
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("WARNING 自定义拒绝策略: Task " + r.toString() + " rejected from " + executor.toString());
    }
}

3、创建线程池

package com.zhisheng.thread.threadpool.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * Created by 10412 on 2017/7/24.
 */
public class ThreadPool
{
    public static void main(String[] args) {
        //核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s,有界队列长度为 3,
        //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));

        //核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s, 无界队列,
        //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());

        //核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s,有界队列长度为 3, 使用自定义拒绝策略
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue(3), new RejectedThreadPoolHandler());

        for (int i = 1; i <= 10; i++) {
            //创建 10 个任务
            MyTask task = new MyTask(i, "任务" + i);
            //运行
            pool.execute(task);
            System.out.println("活跃的线程数:"+pool.getActiveCount() + ",核心线程数:" + pool.getCorePoolSize() + ",线程池大小:" + pool.getPoolSize() + ",队列的大小" + pool.getQueue().size());
        }

        //关闭线程池
        pool.shutdown();
    }
}

这里运行结果就不截图了,我在本地测试了代码是没问题的,感兴趣的建议还是自己跑一下,然后分析下结果是不是和前面分析的一样,如有问题,请在我博客下面评论!

总结

本文一开始讲了线程池的介绍和好处,然后分析了线程池中最核心的 ThreadPoolExecutor 类中构造器的七个参数的作用、类中两个重要的方法,然后在对比研究了下 JDK 中自带的四种线程池的用法和内部代码细节,最后写了一个自定义的线程池。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67500.html

相关文章

  • 后端ing

    摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...

    roadtogeek 评论0 收藏0
  • Java 总结

    摘要:中的详解必修个多线程问题总结个多线程问题总结有哪些源代码看了后让你收获很多,代码思维和能力有较大的提升有哪些源代码看了后让你收获很多,代码思维和能力有较大的提升开源的运行原理从虚拟机工作流程看运行原理。 自己实现集合框架 (三): 单链表的实现 自己实现集合框架 (三): 单链表的实现 基于 POI 封装 ExcelUtil 精简的 Excel 导入导出 由于 poi 本身只是针对于 ...

    caspar 评论0 收藏0
  • 初读《Java并发编程的艺术》-第十章:Executor框架 -10.1 Executor框架简介

    摘要:线程的启动与销毁都与本地线程同步。操作系统会调度所有线程并将它们分配给可用的。框架的成员主要成员线程池接口接口接口以及工具类。创建单个线程的接口与其实现类用于表示异步计算的结果。参考书籍并发编程的艺术方腾飞魏鹏程晓明著 在java中,直接使用线程来异步的执行任务,线程的每次创建与销毁需要一定的计算机资源开销。每个任务创建一个线程的话,当任务数量多的时候,则对应的创建销毁开销会消耗大量...

    aisuhua 评论0 收藏0
  • 那些年我看过的书 —— 致敬我的大学生活 —— Say Good Bye !

    摘要:开头正式开启我入职的里程,现在已是工作了一个星期了,这个星期算是我入职的过渡期,算是知道了学校生活和工作的差距了,总之,尽快习惯这种生活吧。当时是看的廖雪峰的博客自己也用做爬虫写过几篇博客,不过有些是在前人的基础上写的。 showImg(https://segmentfault.com/img/remote/1460000010867984); 开头 2017.08.21 正式开启我...

    xiaoqibTn 评论0 收藏0

发表评论

0条评论

lolomaco

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<