资讯专栏INFORMATION COLUMN

java并发编程学习之线程池-ThreadPoolExecutor(三)

阿罗 / 1837人阅读

摘要:是所有线程池实现的父类,我们先看看构造函数构造参数线程核心数最大线程数线程空闲后,存活的时间,只有线程数大于的时候生效存活时间的单位任务的阻塞队列创建线程的工程,给线程起名字当线程池满了,选择新加入的任务应该使用什么策略,比如抛异常丢弃当前

ThreadPoolExecutor

ThreadPoolExecutor是所有线程池实现的父类,我们先看看构造函数

构造参数

corePoolSize:线程核心数

maximumPoolSize:最大线程数

keepAliveTime:线程空闲后,存活的时间,只有线程数大于corePoolSize的时候生效

unit:存活时间的单位

workQueue:任务的阻塞队列

threadFactory:创建线程的工程,给线程起名字

handler:当线程池满了,选择新加入的任务应该使用什么策略,比如抛异常、丢弃当前任务、丢弃阻塞队列的最老任务等,也可以自定义。

流程

判断是否超过线程核心数corePoolSize,没超过创建线程

超过线程核心数,则判断队列是否已满,没有满,放入队列

队列也满了,判断是否超过maximumPoolSize,没有就创建线程

超过了,根据策略执行

源码解析

</>复制代码

  1. //32为,前3位作为线程池的状态,后三位是线程数
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. private static final int COUNT_BITS = Integer.SIZE - 3;//28
  4. private static final int CAPACITY = (1 << COUNT_BITS) - 1;00011111 11111111 11111111 11111110
  5. //-1的二进制是11111111 11111111 11111111 11111111
  6. private static final int RUNNING = -1 << COUNT_BITS;//-1如上,左移28位后,就是111000000 00000000 00000000 00000000
  7. private static final int SHUTDOWN = 0 << COUNT_BITS;//0左移28位,还是0,00000000 00000000 00000000 00000000
  8. private static final int STOP = 1 << COUNT_BITS;//00100000 00000000 00000000 00000000
  9. private static final int TIDYING = 2 << COUNT_BITS;//01000000 00000000 00000000 00000000
  10. private static final int TERMINATED = 3 << COUNT_BITS;//01100000 00000000 00000000 00000000
  11. private static int runStateOf(int c) { return c & ~CAPACITY; }//~CAPACITY为11100000000000000000000000000000,与完就是线程的状态
  12. private static int workerCountOf(int c) { return c & CAPACITY; }//与完,是线程的数量
  13. private static int ctlOf(int rs, int wc) { return rs | wc; }
  14. private static boolean isRunning(int c) {
  15. return c < SHUTDOWN;//小于0,说明是RUNNING,RUNNING=-1
  16. }
execute方法

</>复制代码

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {//如果线程数少于线程核心数
  6. if (addWorker(command, true))//增加任务成功,返回true,没成功,继续往下
  7. return;
  8. c = ctl.get();
  9. }
  10. //判断队列
  11. if (isRunning(c) && workQueue.offer(command)) {//如果线程池还在跑,并且可以插入队列
  12. int recheck = ctl.get();
  13. if (! isRunning(recheck) && remove(command))//线程池不是运行状态,就移除刚刚插入的任务
  14. reject(command);//执行策略
  15. else if (workerCountOf(recheck) == 0)//
  16. addWorker(null, false);
  17. }
  18. //队列也满了,判断最大线程数
  19. else if (!addWorker(command, false))
  20. reject(command);//执行策略
  21. }
addWorker方法

</>复制代码

  1. private boolean addWorker(Runnable firstTask, boolean core) {//core为true,使用corePoolSize判断,否则使用maximumPoolSize
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);//获取当前线程状态
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN && // 就是STOP、TIDYING、TERMINATED,此时不让任务进来
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))//
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))
  16. return false;//超过了线程核心数或最大线程数,不让新增
  17. if (compareAndIncrementWorkerCount(c))//返回true,说明成功了,跳出retry循环
  18. break retry;
  19. //失败了,说明被其他符号条件的线程占了,就再判断线程状态是否跟之前一样,不一样重新获取,跳到retry
  20. c = ctl.get(); // Re-read ctl
  21. if (runStateOf(c) != rs)
  22. continue retry;
  23. // else CAS failed due to workerCount change; retry inner loop
  24. }
  25. }
  26. boolean workerStarted = false;
  27. boolean workerAdded = false;
  28. Worker w = null;
  29. try {
  30. w = new Worker(firstTask);
  31. final Thread t = w.thread;
  32. if (t != null) {
  33. final ReentrantLock mainLock = this.mainLock;
  34. mainLock.lock();//获取锁
  35. try {
  36. int rs = runStateOf(ctl.get());//获取线程池的状态
  37. if (rs < SHUTDOWN ||
  38. (rs == SHUTDOWN && firstTask == null)) {
  39. if (t.isAlive()) // 没通过start来启动run的
  40. throw new IllegalThreadStateException();
  41. workers.add(w);//加点hashset
  42. int s = workers.size();
  43. if (s > largestPoolSize)
  44. largestPoolSize = s;//更新当前最大值
  45. workerAdded = true;//增加成功
  46. }
  47. } finally {
  48. mainLock.unlock();
  49. }
  50. if (workerAdded) {
  51. t.start();//启动线程
  52. workerStarted = true;//启动成功
  53. }
  54. }
  55. } finally {
  56. if (! workerStarted)
  57. addWorkerFailed(w);//失败,线程数-1,从hashset移除,并尝试Terminate
  58. }
  59. return workerStarted;
  60. }
runWorker方法

上面执行 t.start();的时候,就会通过run方法调用下面的方法

</>复制代码

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {//任务不为空或者获取的任务也不为空
  9. w.lock();
  10. if ((runStateAtLeast(ctl.get(), STOP) ||
  11. (Thread.interrupted() &&
  12. runStateAtLeast(ctl.get(), STOP))) &&
  13. !wt.isInterrupted())
  14. wt.interrupt();
  15. try {
  16. beforeExecute(wt, task);
  17. Throwable thrown = null;
  18. try {
  19. task.run();//调用run方法,这里没有通过start,也就是说没有启动新线程
  20. } catch (RuntimeException x) {
  21. thrown = x; throw x;
  22. } catch (Error x) {
  23. thrown = x; throw x;
  24. } catch (Throwable x) {
  25. thrown = x; throw new Error(x);
  26. } finally {
  27. afterExecute(task, thrown);
  28. }
  29. } finally {
  30. task = null;
  31. w.completedTasks++;//完成任务数加1
  32. w.unlock();//释放
  33. }
  34. }
  35. completedAbruptly = false;
  36. } finally {
  37. processWorkerExit(w, completedAbruptly);//移除w,在task为空的时候,比如线程池状态停止或者启动的线程太多
  38. }
  39. }

getTask方法
当Worker第一次启动的时候,调用run方法,后面就一直从队列里获取任务

</>复制代码

  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);//获取当前线程池状态
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//
  8. decrementWorkerCount();//线程数量-1
  9. return null;
  10. }
  11. int wc = workerCountOf(c);//线程数
  12. //allowCoreThreadTimeOut为true,说明线程数要根据是否超过核心线程数判断keepAliveTime
  13. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//是否超过核心线程数
  14. if ((wc > maximumPoolSize || (timed && timedOut))//超过了最大线程数
  15. && (wc > 1 || workQueue.isEmpty())) {
  16. if (compareAndDecrementWorkerCount(c))//线程数-1
  17. return null;//返回空
  18. continue;
  19. }
  20. try {
  21. Runnable r = timed ?
  22. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  23. workQueue.take();//获取任务
  24. if (r != null)
  25. return r;
  26. timedOut = true;
  27. } catch (InterruptedException retry) {
  28. timedOut = false;
  29. }
  30. }
  31. }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/75696.html

相关文章

  • java并发编程习之线程-预定义线程(四)

    摘要:系统预定了几个线程池,不过建议手动创建,以防止错误创建消耗资源,比如创建太多线程或者固定线程数量,无界队列固定线程数量,数量为,无界队列,会按顺序执行不限制线程数量,使用队列,使用于短任务基于用于周期性执行任务示例第一个是,第二个是第一 系统预定了几个线程池,不过建议手动创建,以防止错误创建消耗资源,比如创建太多线程或者OOM FixedThreadPool 固定线程数量,无界队列 p...

    suemi 评论0 收藏0
  • java并发编程习之线程-Executor和ExecutorService(一)

    摘要:接口用于提交任务接口继承了接口设置线程的状态,还没执行的线程会被中断设置线程的状态,尝试停止正在进行的线程当调用或方法后返回为当调用方法后,并且所有提交的任务完成后返回为当调用方法后,成功停止后返回为当前线程阻塞,直到线程执行完时间到被中断 Executor接口 void execute(Runnable command)//用于提交command任务 ExecutorService接...

    liuchengxu 评论0 收藏0
  • java并发编程习之线程-AbstractExecutorService(二)

    摘要:抽象类,实现了的接口。将任务封装成提交任务主要方法在任务是否超时超时时间任务书用于存放结果的,先完成的放前面。 AbstractExecutorService抽象类,实现了ExecutorService的接口。 newTaskFor 将任务封装成FutureTask protected RunnableFuture newTaskFor(Runnable runnable, T va...

    Jokcy 评论0 收藏0
  • java并发编程习之线程的生命周期-yield(

    摘要:方法作用让当前的线程状态从运行状态转到就绪状态,然后和其他就绪状态的同相同优先级的其他线程竞争的执行权。也就是说,这个线程,还是有机会继续再次执行的。 方法作用 让当前的线程状态从运行状态转到就绪状态,然后和其他就绪状态的同相同优先级的其他线程竞争CPU的执行权。也就是说,这个线程,还是有机会继续再次执行的。 优先权 优先级范围是1~10,数字越大,优先级越高,默认为5,但是由于操作系...

    Barry_Ng 评论0 收藏0
  • 跟着阿里p7一起java并发 - 第18天:玩转java线程,这一篇就够了

    摘要:高并发系列第篇文章。简单的说,在使用了线程池之后,创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池。如果调用了线程池的方法,线程池会提前把核心线程都创造好,并启动线程池允许创建的最大线程数。 java高并发系列第18篇文章。 本文主要内容 什么是线程池 线程池实现原理 线程池中常见的各种队列 自定义线程创建的工厂 常见的饱和策略 自定义饱和策略 ...

    AdolphLWQ 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<