资讯专栏INFORMATION COLUMN

追踪解析 ThreadPoolExecutor 源码

gaomysion / 2590人阅读

摘要:的前位数用来表示线程的数量,后面三位用来表示线程池的状态。线程池的状态有五种,分别是,根据单词就能猜出大概。并且为了考虑性能问题,线程池的设计没有使用悲观锁关键字,而是大量使用了和机制。

零 前期准备 0 FBI WARNING

文章异常啰嗦且绕弯。

1 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

2 ThreadPoolExecutor 简介

ThreadPoolExecutor 是 jdk4 中加入的工具,被封装在 jdk 自带的 Executors 框架中,是 java 中最经典的线程池技术。

ThreadPoolExecutor 类在 concurrent 包下,和其它线程工具类一样都由 Doug Lea 大神操刀完成。

[ 在看完 Spring ioc 和 Gson 之后有点乏了,换换口味看一些 jdk 的源码 ]

3 Demo

</>复制代码

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class ThreadPoolDemo {
  4. public static void main(String[] args){
  5. //创建线程池
  6. //这里使用固定线程数的线程池,线程数为 5
  7. ExecutorService executorService = Executors.newFixedThreadPool(5);
  8. for(int i = 0 ; i < 100 ; i ++){
  9. final int ii = i;
  10. //创建 Runnable 作为线程池的任务
  11. Runnable r = () -> System.out.println(ii);
  12. //执行
  13. executorService.execute(r);
  14. }
  15. }
  16. }
一 线程池的初始化

线程池的初始化调用的 Executors 框架的静态方法:

</>复制代码

  1. //Executors.class
  2. public static ExecutorService newFixedThreadPool(int nThreads) {
  3. return new ThreadPoolExecutor(nThreads, nThreads,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue());
  6. }

继续追踪这个构造方法:

</>复制代码

  1. //ThreadPoolExecutor.class
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue workQueue) {
  7. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  8. Executors.defaultThreadFactory(), defaultHandler);
  9. }

继续追踪:

</>复制代码

  1. //ThreadPoolExecutor.class
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue workQueue,
  7. ThreadFactory threadFactory,
  8. RejectedExecutionHandler handler) {
  9. //验证参数的有效性
  10. if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
  11. throw new IllegalArgumentException();
  12. if (workQueue == null || threadFactory == null || handler == null)
  13. throw new NullPointerException();
  14. //本例中不涉及权限
  15. this.acc = System.getSecurityManager() == null ?
  16. null :
  17. AccessController.getContext();
  18. //线程数
  19. this.corePoolSize = corePoolSize;
  20. //最大线程数
  21. //本例中使用固定线程数的线程池,所以线程数和最大线程数相等
  22. this.maximumPoolSize = maximumPoolSize;
  23. //用于存储任务的队列
  24. //此处使用 LinkedBlockingQueue 来储存任务,其线程安全
  25. this.workQueue = workQueue;
  26. //keepAliveTime 参数用于表示:
  27. //对于超出线程和队列缓存总和的任务,是否要临时增加线程来处理
  28. //超出的线程的存在时间是多少
  29. //这里使用的是定长线程池,所以 keepAliveTime = 0,即不增加线程
  30. this.keepAliveTime = unit.toNanos(keepAliveTime);
  31. //用于创建线程的工厂类
  32. this.threadFactory = threadFactory;
  33. //handler 用来处理 task 太多时候的拒绝策略
  34. //此例中使用的是默认的,即定义在 ThreadPoolExecutor 中的 defaultHandler 对象
  35. this.handler = handler;
  36. }
二 Worker

Worker 是 ThreadPoolExecutor 的内部类,可以看做是 Runnable 的代理类:

</>复制代码

  1. //ThreadPoolExecutor.class
  2. private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
  3. private static final long serialVersionUID = 6138294804551838833L;
  4. final Thread thread;
  5. Runnable firstTask;
  6. //完成 task 数量的计数器
  7. volatile long completedTasks;
  8. Worker(Runnable firstTask) {
  9. //这个方法是 AbstractQueuedSynchronizer 中的方法,功能相当于加锁
  10. //-1 的意思是后续的任务会处于阻塞状态,即为已经加锁
  11. setState(-1);
  12. //在创建的时候存入一个要处理的 task
  13. //需要注意的是每个 worker 对象被创建出来之后是可以重复利用来处理多个 task 的
  14. this.firstTask = firstTask;
  15. //worker 会用自身作为 Runnable 对象去创建一个线程
  16. //这里调用线程工厂进行线程创建
  17. this.thread = getThreadFactory().newThread(this);
  18. }
  19. //对于线程变量来说,其启动的就是 worker 的 run() 方法
  20. public void run() {
  21. //runWorker(...) 方法在 ThreadPoolExecutor 里
  22. runWorker(this);
  23. }
  24. //获取锁的状态
  25. protected boolean isHeldExclusively() {
  26. return getState() != 0;
  27. }
  28. //重写了 AbstractQueuedSynchronizer 中的 tryAcquire(...) 方法
  29. //尝试加锁
  30. protected boolean tryAcquire(int unused) {
  31. if (compareAndSetState(0, 1)) {
  32. setExclusiveOwnerThread(Thread.currentThread());
  33. return true;
  34. }
  35. return false;
  36. }
  37. //重写了 AbstractQueuedSynchronizer 中的 tryRelease(...) 方法
  38. //尝试释放锁
  39. protected boolean tryRelease(int unused) {
  40. setExclusiveOwnerThread(null);
  41. setState(0);
  42. return true;
  43. }
  44. //真正的加锁方法
  45. public void lock() {
  46. acquire(1);
  47. }
  48. //尝试加锁
  49. public boolean tryLock() {
  50. return tryAcquire(1);
  51. }
  52. //真正的释放锁方法
  53. public void unlock() {
  54. release(1);
  55. }
  56. //判断是否在锁中
  57. public boolean isLocked() {
  58. return isHeldExclusively();
  59. }
  60. //中断线程
  61. void interruptIfStarted() {
  62. Thread t;
  63. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  64. try {
  65. t.interrupt();
  66. } catch (SecurityException ignore) {
  67. }
  68. }
  69. }
  70. }

追踪一下 runWorker(...) 方法:

</>复制代码

  1. //ThreadPoolExecutor.class
  2. final void runWorker(Worker w) {
  3. //获取当前所在的线程的实例对象
  4. Thread wt = Thread.currentThread();
  5. //获取 task
  6. Runnable task = w.firstTask;
  7. //取出来之后把 task 置空
  8. w.firstTask = null;
  9. //此处释放锁
  10. w.unlock();
  11. //指示器,此变量为 true 的时候确认该方法已经执行完毕
  12. boolean completedAbruptly = true;
  13. try {
  14. //此处为一个 while 循环,用于不断的执行 task
  15. //getTask() 方法会从队列里不断抓取 task 并进行执行
  16. //当 task 为 null,且队列里已经没有更多 task 的时候,就会终止循环
  17. while (task != null || (task = getTask()) != null) {
  18. //加锁,独占线程
  19. w.lock();
  20. //在这里会判断线程的状态,如果存在符合中断的情况,就会直接中断掉
  21. if ((runStateAtLeast(ctl.get(), STOP)
  22. || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
  23. wt.interrupt();
  24. try {
  25. //beforeExecute(...) 和 afterExecute(...) 方法在 ThreadPoolExecutor 中并没有实现
  26. //是预留出来给使用者重写,以达到业务需求的方法
  27. beforeExecute(wt, task);
  28. try {
  29. //此处执行 task
  30. task.run();
  31. afterExecute(task, null);
  32. } catch (Throwable ex) {
  33. afterExecute(task, ex);
  34. throw ex;
  35. }
  36. } finally {
  37. //将执行的 task 置空
  38. task = null;
  39. //每完成一个 task 就会加 1
  40. w.completedTasks++;
  41. //释放锁
  42. w.unlock();
  43. }
  44. }
  45. completedAbruptly = false;
  46. } finally {
  47. //这个方法会销毁掉 worker
  48. //同时如果检测到有新的 task 又会重新创建 Worker
  49. processWorkerExit(w, completedAbruptly);
  50. }
  51. }

Worker 是线程池中真正起完成业务逻辑的组件,是任务和线程的封装。

三 线程池的状态控制

线程池的状态主要由 ctl 变量来进行控制:

</>复制代码

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 是一个 AtomicInteger 类型的变量,其实可以简单理解为一个 int 值,AtomicInteger 只是能够适应高并发的原子化操作的需要。

ctl 的前 29 位数用来表示线程(Worker)的数量,后面三位用来表示线程池的状态。

线程池的状态有五种,分别是 Running、Shutdown、Stop、Tidying、Terminate,根据单词就能猜出大概。

注意的是,这五种状态在线程池中都以 int 变量的形式存在,从前到后依次变大,对状态的比较有一系列方法:

</>复制代码

  1. //ThreadPoolExecutor.class
  2. private static boolean runStateLessThan(int c, int s) {
  3. //c 的状态值要小于 s
  4. return c < s;
  5. }
  6. //ThreadPoolExecutor.class
  7. private static boolean runStateAtLeast(int c, int s) {
  8. //c 的状态值要大于或等于 s
  9. return c >= s;
  10. }
  11. //ThreadPoolExecutor.class
  12. private static boolean isRunning(int c) {
  13. //状态里只有 RUNNING 是小于 SHUTDOWN 的
  14. return c < SHUTDOWN;
  15. }

在这些方法里,传入的参数 c 一般指的是当前线程池状态,s 是用来对比的参照状态。

四 线程池的执行

该 part 的起点:

</>复制代码

  1. executorService.execute(r);

来追踪 execute(...) 方法:

</>复制代码

  1. public void execute(Runnable command) {
  2. //有效性验证
  3. if (command == null)
  4. throw new NullPointerException();
  5. //ctl 是一个 AtomicInteger 类型的变量,用来记录线程池的状态
  6. int c = ctl.get();
  7. //workerCountOf(...) 方法会返回当前运行的 Worker 的数量
  8. if (workerCountOf(c) < corePoolSize) {
  9. //Worker 的数量小于线程池容量的情况下
  10. //直接增加 Worker 并取出 task 去运行
  11. if (addWorker(command, true))
  12. return;
  13. //如果 Worker 已经顺利执行了 task,应该会直接返回掉
  14. //如果执行中出现了其它情况,则会继续往下走
  15. //此处刷新状态
  16. c = ctl.get();
  17. }
  18. //当 Worker 数量已经达到线程池的指定数量,或者添加 Worker 的时候出问题的时候,会进入此判断语句
  19. //先判断线程池是否处于活跃状态,且 task 是否已经被成功添加到队列中
  20. //如果不满足,会进入 else 语句中,先最后尝试一次 addWorker(...) 方法,如果不成功就拒绝 task
  21. //reject(...) 方法会调用 handler 的拒绝策略
  22. if (isRunning(c) && workQueue.offer(command)) {
  23. int recheck = ctl.get();
  24. if (! isRunning(recheck) && remove(command))
  25. reject(command);
  26. else if (workerCountOf(recheck) == 0)
  27. addWorker(null, false);
  28. }else if (!addWorker(command, false))
  29. reject(command);
  30. }
1 reject

这里先提及一下 reject(...) 方法:

</>复制代码

  1. //ThreadPoolExecutor.class
  2. final void reject(Runnable command) {
  3. handler.rejectedExecution(command, this);
  4. }

本质是调用了 handler 对象的相关方法。在本例中,handler 对象指向了 defaultHandler:

</>复制代码

  1. //ThreadPoolExecutor.class
  2. private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

defaultHandler 是一个 AbortPolicy 类型的对象,而 AbortPolicy 是 ThreadPoolExecutor 的静态内部类。

AbortPolicy 起作用的方法为 rejectedExecution(...) 方法:

</>复制代码

  1. //AbortPolicy.class
  2. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  3. throw new RejectedExecutionException("Task " + r.toString() +
  4. " rejected from " + e.toString());
  5. }

也就是说,在 task 过多的情况下,AbortPolicy 的应对策略是抛出异常。

2 addWorker

来看一下核心方法 addWorker(...):

</>复制代码

  1. //ThreadPoolExecutor.class
  2. private boolean addWorker(Runnable firstTask, boolean core) {
  3. //先标记这个 for 循环,方便退出循环
  4. retry:
  5. //在每一次循环开始之前会刷新一次状态标识
  6. for (int c = ctl.get();;) {
  7. //这里先进行判断,如果线程池已经关闭了,或者没有 task 了,就会返回 false
  8. if (runStateAtLeast(c, SHUTDOWN)
  9. && (runStateAtLeast(c, STOP)
  10. || firstTask != null
  11. || workQueue.isEmpty()))
  12. return false;
  13. for (;;) {
  14. //如果 Worker 数量已经超出了最大值就会直接返回 false
  15. if (workerCountOf(c)
  16. >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
  17. return false;
  18. //将 ctl 变量的值加 1,如果成功了就会跳出循环
  19. if (compareAndIncrementWorkerCount(c))
  20. break retry;
  21. c = ctl.get();
  22. //在状态值比 SHUTDOWN 大的时候会直接跳到最外头的循环里
  23. //需要注意的是最外面的 for 循环会判断状态值是否大于 SHUTDOWN
  24. //如果大于 SHUTDOWN 的话就返回 false
  25. if (runStateAtLeast(c, SHUTDOWN))
  26. continue retry;
  27. }
  28. }
  29. boolean workerStarted = false;
  30. boolean workerAdded = false;
  31. Worker w = null;
  32. try {
  33. //创建一个 Worker
  34. w = new Worker(firstTask);
  35. //获取线程对象
  36. final Thread t = w.thread;
  37. if (t != null) {
  38. //加锁,此处加的是一把全局的锁
  39. final ReentrantLock mainLock = this.mainLock;
  40. mainLock.lock();
  41. try {
  42. int c = ctl.get();
  43. //如果状态值 c 是 RUNNING,或者 [c 是 RUNNING 或者 SHUTDOWN 且 firstTask 是 null] 就会进入这个判断语句
  44. //
  45. if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
  46. //如果这个线程已经处于运作状态,会抛出异常
  47. if (t.isAlive())
  48. throw new IllegalThreadStateException();
  49. //workers 是一个列表,用于存储 Worker 对象
  50. workers.add(w);
  51. //获取 Worker 的数量
  52. int s = workers.size();
  53. //largestPoolSize 用来记录线程池达到过的最大线程数
  54. if (s > largestPoolSize)
  55. largestPoolSize = s;
  56. //标记 Worker 已经被添加
  57. workerAdded = true;
  58. }
  59. } finally {
  60. //释放锁
  61. mainLock.unlock();
  62. }
  63. //先判断 Worker 是否已经被添加到 workers 内了
  64. if (workerAdded) {
  65. //这是该方法核心的启动线程方法
  66. t.start();
  67. //标记 Worker 已经开始运行了
  68. workerStarted = true;
  69. }
  70. }
  71. } finally {
  72. //如果没有标记 Worker 已经开始工作,会在这里销毁掉 Worker
  73. if (!workerStarted)
  74. addWorkerFailed(w);
  75. }
  76. return workerStarted;
  77. }
五 一点唠叨

先总结一下线程池的业务逻辑:

</>复制代码

  1. 1 接收到 task (即实现了 Runnable 接口的实例对象) [execute(...) 方法]
  2. 2 用 task 去尝试创建一个 Worker 实例 [execute(...) 方法]
  3. 2.1 如果 Worker 数量没有达到线程池的指定最大值 -> 新建
  4. 2.2 如果 Worker 数量达到了线程池的指定最大值 -> 不会再创建,而是把 task 储存起来等待空闲的 Worker 去提取
  5. 2.3 如果 task 队列也已经满了,无法再添加 -> 触发拒绝机制(handler)
  6. 3 Worker 在执行的时候调用其内部的 Thread 实例对象的 start() 方法 [addWorker(...) 方法]
  7. 4 该 start() 方法会调用到 Worker 的 run() 方法 [Worker.class 内的 run() 方法]
  8. 5 Worker 的 run() 方法本质上是封装了 task 的 run() 方法 [runWorker(...) 方法]

主线业务逻辑不算复杂,比较艰难的是为了保证数据的一致性,线程池代码中充斥着大量的状态判断和锁机制。

并且为了考虑性能问题,线程池的设计没有使用悲观锁(synchronized 关键字),而是大量使用了 ASQ 和 ReetrentLock 机制。

本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/73156.html

相关文章

  • 追踪解析 FutureTask 源码

    摘要:零前期准备文章异常啰嗦且绕弯。版本版本简介是中默认的实现类,常与结合进行多线程并发操作。所以方法的主体其实就是去唤醒被阻塞的线程。本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充 零 前期准备 0 FBI WARNING 文章异常啰嗦且绕弯。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 简介 ...

    xcc3641 评论0 收藏0
  • 线程池运行模型源码解析

    摘要:那么线程池到底是怎么利用类来实现持续不断地接收提交的任务并执行的呢接下来,我们通过的源代码来一步一步抽丝剥茧,揭开线程池运行模型的神秘面纱。 在上一篇文章《从0到1玩转线程池》中,我们了解了线程池的使用方法,以及向线程池中提交任务的完整流程和ThreadPoolExecutor.execute方法的源代码。在这篇文章中,我们将会从头阅读线程池ThreadPoolExecutor类的源代...

    MockingBird 评论0 收藏0
  • java线程池——ThreadPoolExecutor源码解析

    摘要:将线程池状态置为并不会立即停止,停止接收外部的任务,内部正在跑的任务和队列里等待的任务,会执行完,才真正停止。将线程池状态置为。 在Java中,我们经常使用的线程池就是ThreadPoolExecutor,此外还有定时的线程池ScheduledExecutorService(),但是需要注意的是Executors.newCachedThreadPool()的线程是没有上届的,在使用时,...

    TerryCai 评论0 收藏0
  • 源码解析Executors.newFixedThreadPool(int)

    摘要:创建一个线程池,具有固定线程数,运行在共享的无界队列中。固定线程数源码如下是的实现类。线程池中允许最大的线程数。如果线程数超过了核心线程数,过量的线程在关闭前等待新任务的最大时间。处理因为线程边界和队列容量导致的堵塞。 1.Executors.newFixedThreadPool(int nThreads):创建一个线程池,具有固定线程数,运行在共享的无界队列中。在大多数时候,线程会主...

    source 评论0 收藏0
  • 后端ing

    摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...

    roadtogeek 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<