资讯专栏INFORMATION COLUMN

ThreadPoolExecutor源码阅读

Meathill / 1916人阅读

摘要:介绍线程池一般包含三个主要部分调度器决定由哪个线程来执行任务执行任务所能够的最大耗时等线程队列存放并管理着一系列线程这些线程都处于阻塞状态或休眠状态任务队列存放着用户提交的需要被执行的任务一般任务的执行的即先提交的任务先被执行调度器并非是必

介绍

线程池一般包含三个主要部分:

调度器: 决定由哪个线程来执行任务, 执行任务所能够的最大耗时等

线程队列: 存放并管理着一系列线程, 这些线程都处于阻塞状态或休眠状态

任务队列: 存放着用户提交的需要被执行的任务. 一般任务的执行 FIFO 的, 即先提交的任务先被执行

调度器并非是必须的, 例如 Java 中实现的 ThreadPoolExecutor 就没有调度器, 而是所有的线程都不断从任务队列中取出任务, 然后执行.线程池模型可以用下图简单地表示

构造函数
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

  
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

一般构造函数包含了最主要的成员变量,我们来看看几个参数

corePoolSize:线程池的最小线程数量

maximumPoolSize:线程池的最大线程数量

workQueue:任务队列

threadFactory:产生线程的工厂

keepAliveTime:允许的最大idle时间

handler:拒绝执行处理器
这些是可控制的线程池的参数

线程池的状态

RUNNING: 接受新任务并处理队列中的任务

SHUTDOWN: 不接受新任务,但是处理队列中的任务

STOP: 不接受新任务,也不处理队列中的任务,还会中断已经进行中的任务

TIDYING: 所有任务已经执行完毕,工作线程数量为0,线程池状态转换为TIDYING,terminate方法被出触发。

TERMINATED: terminated() 执行完毕

状态转换

RUNNING -> SHUTDOWN:shutdown()

(RUNNING or SHUTDOWN) -> STOP:shutdownNow()

SHUTDOWN -> TIDYING:线程池和任务队列都为空

STOP -> TIDYING:线程池为空

TIDYING -> TERMINATED:terminated()

  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

状态码是由一个32位的原子Int的前三位表示的,后三位表示工作线程的数量

线程存在哪里?

真正的工作线程封装成一个内部类Worker,存放在HashSet中

private final HashSet workers = new HashSet();

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
    
        /** 真正的工作线程 */
        final Thread thread;
        /** 要执行的任务 */
        Runnable firstTask;
        /** 已经完成的任务计数器 */
        volatile long completedTasks;
      
        Worker(Runnable firstTask) {
            setState(-1); // 阻止中断
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 只是一个家的代理,真实的执行方法在runWorker里  */
        public void run() {
            runWorker(this);
        }

        //下面就是一个不可重入的排他锁
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
submit()

ThreadPoolExecutor实现了ExecutorService接口

// Class:ExecutorService
// 提交一个待执行的Runnable任务,并返回Future
 Future submit(Callable task);

 Future submit(Runnable task, T result);

Future submit(Runnable task);
    public Future submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public  Future submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    
      protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
        return new FutureTask(runnable, value);
    }

再看ThreadPoolExcutor的实现,submit方法都委托给execute执行了。

当我们通过 execute(Runnable) 提交一个任务时:

如果此时线程池中线程个数小于 corePoolSize, 则此任务不会插入到任务队列中, 而是直接创建一个新的线程来执行此任务, 即使当前线程池中有空闲的线程.

如果线程数大于 corePoolSize 但是小于 maximumPoolSize:

如果任务队列还未满, 则会将此任务插入到任务队列末尾;

如果此时任务队列已满, 则会创建新的线程来执行此任务.

如果线程数等于 maximumPoolSize:

如果任务队列还未满, 则会将此任务插入到任务队列末尾;

如果此时任务队列已满, 则会又 RejectedExecutionHandler 处理, 默认情况下是抛出 RejectedExecutionException 异常.

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
  
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

上面的代码有三个步骤, 首先第一步是检查当前线程池的线程数是否小于 corePoolSize, 如果小于, 那么由我们前面提到的规则, 线程池会创建一个新的线程来执行此任务, 因此在第一个 if 语句中, 会调用 addWorker(command, true) 来创建一个新 Worker 线程, 并执行此任务. addWorker 的第二个参数是一个 boolean 类型的, 它的作用是用于标识是否需要使用 corePoolSize 字段, 如果它为真, 则添加新任务时, 需要考虑到 corePoolSize 字段的影响. 这里至于 addWorker 内部的实现细节我们暂且不管, 先把整个提交任务的大体脉络理清了再说.

如果前面的判断不满足, 那么会将此任务插入到工作队列中, 即 workQueue.offer(command). 当然, 为了健壮性考虑, 当插入到 workQueue 后, 我们还需要再次检查一下此时线程池是否还是 RUNNING 状态, 如果不是的话就会将原来插入队列中的那个任务删除, 然后调用 reject 方法拒绝此任务的提交; 接着考虑到在我们插入任务到 workQueue 中的同时, 如果此时线程池中的线程都执行完毕并终止了, 在这样的情况下刚刚插入到 workQueue 中的任务就永远不会得到执行了. 为了避免这样的情况, 因此我们由再次检查一下线程池中的线程数, 如果为零, 则调用 addWorker(null, false) 来添加一个线程.
如果前面所分析的情况都不满足, 那么就会进入到第三个 if 判断, 在这里会调用 addWorker(command, false) 来将此任务提交到线程池中. 注意到这个方法的第二个参数是 false, 表示我们在此次调用 addWorker 时, 不考虑 corePoolSize 的影响, 即忽略 corePoolSize 字段.

addWorker 方法

前面我们大体分析了一下 execute 提交任务的流程, 不过省略了一个关键步骤, 即 addWorker 方法. 现在我们就来揭开它的神秘面纱吧.
首先看一下 addWorker 方法的签名:

private boolean addWorker(Runnable firstTask, boolean core)

这个方法接收两个参数, 第一个是一个 Runnable 类型的, 一般来说是我们调用 execute 方法所传输的参数, 不过也有可能是 null 值, 这样的情况我们在前面一小节中也见到过.
那么第二个参数是做什么的呢? 第二个参数是一个 boolean 类型的变量, 它的作用是标识是否使用 corePoolSize 属性. 我们知道, ThreadPoolExecutor 中, 有一个 corePoolSize 属性, 用于动态调整线程池中的核心线程数. 那么当 core 这个参数是 true 时, 则表示在添加新任务时, 需要考虑到 corePoolSzie 的影响(例如如果此时线程数已经大于 corePoolSize 了, 那么就不能再添加新线程了); 当 core 为 false 时, 就不考虑 corePoolSize 的影响(其实代码中是以 maximumPoolSize 作为 corePoolSize 来做判断条件的), 一有新任务, 就对应地生成一个新的线程.
说了这么多, 还不如来看一下 addWorker 的源码吧:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 这里一大段的 for 语句, 其实就是判断和处理 core 参数的.
    // 当经过判断, 如果当前的线程大于 corePoolSize 或 maximumPoolSize 时(根据 core 的值来判断), 
    // 则表示不能新建新的 Worker 线程, 此时返回 false.
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 当 core 为真, 那么就判断当前线程是否大于 corePoolSize
            // 当 core 为假, 那么就判断当前线程数是否大于 maximumPoolSize
            // 这里的 for 循环是一个自旋CAS(CompareAndSwap)操作, 用于确保多线程环境下的正确性
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : ma))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

首先在 addWorker 的一开始, 有一个 for 循环, 用于判断当前是否可以添加新的 Worker 线程. 它的逻辑如下:

如果传入的 core 为真, 那么判断当前的线程数是否大于 corePoolSize, 如果大于, 则不能新建 Worker 线程, 返回 false.

如果传入的 core 为假, 那么判断当前的线程数是否大于 maximumPoolSize, 如果大于, 则不能新建 Worker 线程, 返回 false.

如果条件符合, 那么在 for 循环内, 又有一个自旋CAS 更新逻辑, 用于递增当前的线程数, 即 compareAndIncrementWorkerCount(c), 这个方法会原子地更新 ctl 的值, 将当前线程数的值递增一.

addWorker 接下来有一个 try...finally 语句块, 这里就是实际上的创建线程、启动线程、添加线程到线程池中的工作了.
接下来我们看任务的真正执行runWorker

runWorker方法

语义:执行runWorker,调用Runnable的run方法,再其外围包装了中断的策略

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 如果线程池正在停止,确保其中断 
                // 如果不是,确保其不中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
reject

语义:提交不了线程池时,拒绝策略
接下来我们看线程提交被拒绝的策略reject

 final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

RejectedExecutionHandler 接口有四种实现

AbortPolicy;拒绝,抛出RejectedExecutionException

CallerRunsPolicy:如果被拒绝,在当前线程执行任务,

RejectedExecutionHandler:静静的拒绝,什么都不做

DiscardOldestPolicy:丢弃最老的未处理的请求,重试提交当前请求

shutdown

语义:不接受新任务,但是处理队列中的任务
shutdown方法主要就是设置线程池状态,设置空闲的worker的中断

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
shutdownNow

shutdownNow:不接受新任务,也不处理队列中的任务,还会中断已经进行中的任务
shutdownNow方法主要就是设置线程池状态,设置所有worker的中断

    public List shutdownNow() {
        List tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

本文严重参考Java ThreadPoolExecutor 线程池源码分析

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70095.html

相关文章

  • 线程池运行模型源码全解析

    摘要:那么线程池到底是怎么利用类来实现持续不断地接收提交的任务并执行的呢接下来,我们通过的源代码来一步一步抽丝剥茧,揭开线程池运行模型的神秘面纱。 在上一篇文章《从0到1玩转线程池》中,我们了解了线程池的使用方法,以及向线程池中提交任务的完整流程和ThreadPoolExecutor.execute方法的源代码。在这篇文章中,我们将会从头阅读线程池ThreadPoolExecutor类的源代...

    MockingBird 评论0 收藏0
  • Java线程池从使用到阅读源码(3/10)

    摘要:最后,我们会通过对源代码的剖析深入了解线程池的运行过程和具体设计,真正达到知其然而知其所以然的水平。创建线程池既然线程池是一个类,那么最直接的使用方法一定是一个类的对象,例如。单线程线程池单线程线程 我们一般不会选择直接使用线程类Thread进行多线程编程,而是使用更方便的线程池来进行任务的调度和管理。线程池就像共享单车,我们只要在我们有需要的时候去获取就可以了。甚至可以说线程池更棒,...

    468122151 评论0 收藏0
  • 从0到1玩转线程池

    摘要:提交任务当创建了一个线程池之后我们就可以将任务提交到线程池中执行了。提交任务到线程池中相当简单,我们只要把原来传入类构造器的对象传入线程池的方法或者方法就可以了。 我们一般不会选择直接使用线程类Thread进行多线程编程,而是使用更方便的线程池来进行任务的调度和管理。线程池就像共享单车,我们只要在我们有需要的时候去获取就可以了。甚至可以说线程池更棒,我们只需要把任务提交给它,它就会在合...

    darkerXi 评论0 收藏0
  • 你真的懂ThreadPoolExecutor线程池技术吗?看了源码你会有全新的认识

    摘要:参数说明,线程池保留的最小线程数。,线程池中允许拥有的最大线程数。,线程池的运行状态。除非线程池状态发生了变化,发退回到外层循环重新执行,判断线程池的状态。是线程池的核心控制状态,包含的线程池运行状态和有效线程数。 Java是一门多线程的语言,基本上生产环境的Java项目都离不开多线程。而线程则是其中最重要的系统资源之一,如果这个资源利用得不好,很容易导致程序低效率,甚至是出问题。 有...

    JerryC 评论0 收藏0
  • 解读线程池

    摘要:为了让大家理解线程池的整个设计方案,我会按照的设计思路来多说一些相关的东西。也是因为线程池的需要,所以才有了这个接口。 线程池是非常重要的工具,如果你要成为一个好的工程师,还是得比较好地掌握这个知识。即使你为了谋生,也要知道,这基本上是面试必问的题目,而且面试官很容易从被面试者的回答中捕捉到被面试者的技术水平。 本文略长,建议在 pc 上阅读,边看文章边翻源码(Java7 和 Java...

    imccl 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<