资讯专栏INFORMATION COLUMN

java线程池的原理学习(三)

mgckid / 1553人阅读

摘要:接上文线程池的原理学习二深入剖析线程池的五种状态类中将线程状态分为了以下五种可以接受新任务并且处理进入队列中的任务不接受新任务,但是仍然执行队列中的任务不接受新任务也不执行队列中的任务所有任务中止,队列为空,进入该状态下的任务会执行方法方法

接上文:java线程池的原理学习(二)

ThreadPoolExecutor深入剖析 线程池的五种状态

ThreadPoolExecutor 类中将线程状态( runState)分为了以下五种:

RUNNING:可以接受新任务并且处理进入队列中的任务
SHUTDOWN:不接受新任务,但是仍然执行队列中的任务
STOP:不接受新任务也不执行队列中的任务
TIDYING:所有任务中止,队列为空,进入该状态下的任务会执行 terminated()方法
TERMINATEDterminated()方法执行完成后进入该状态

状态之间的转换

RUNNING -> SHUTDOWN

调用了 shutdown()方法,可能是在 finalize()方法中被隐式调用

(RUNNING or SHUTDOWN) -> STOP

调用 shutdownNow()

SHUTDOWN -> TIDYING

当队列和线程池都为空时

STOP -> TIDYING

线程池为空时

TIDYING -> TERMINATED

terminated()方法执行完成

线程池状态实现

如果查看 ThreadPoolExecutor的源码,会发现开头定义了这几个变量来代表线程状态和活动线程的数量:

    //原子变量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

这个类中将二进制数分为了两部分,高位代表线程池状态( runState),低位代表活动线程数( workerCount), CAPACITY代表最大的活动线程数,为2^29-1,下面为了更直观的看到这些数我做了些打印:

public class Test1 {
    public static void main(String[] args) {
            final int COUNT_BITS = Integer.SIZE - 3;
            final int CAPACITY   = (1 << COUNT_BITS) - 1;

            final int RUNNING    = -1 << COUNT_BITS;
            final int SHUTDOWN   =  0 << COUNT_BITS;
            final int STOP       =  1 << COUNT_BITS;
            final int TIDYING    =  2 << COUNT_BITS;
            final int TERMINATED =  3 << COUNT_BITS;
            
            System.out.println(Integer.toBinaryString(CAPACITY));
            System.out.println(Integer.toBinaryString(RUNNING));
            System.out.println(Integer.toBinaryString(SHUTDOWN));
            System.out.println(Integer.toBinaryString(STOP));
            System.out.println(Integer.toBinaryString(TIDYING));
            System.out.println(Integer.toBinaryString(TERMINATED));
    }
}

输出:

11111111111111111111111111111
11100000000000000000000000000000
0
100000000000000000000000000000
1000000000000000000000000000000
1100000000000000000000000000000

打印的时候会将高位0省略
可以看到,第一行代表线程容量,后面5行提取高3位得到:

111 - RUNNING
000 - SHUTDOWN
001 - STOP
010 - TIDYING
011 - TERMINATED

分别对应5种状态,可以看到这样定义之后,只需要通过简单的移位操作就可以进行状态的转换。

重要方法

execute方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
        /**分三步执行
         * 如果workerCount=corePoolSize,判断线程池是否处于运行状态,再将任务加入队列
         * */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();     //用于double check
            //如果线程池处于非运行态,则将任务从缓存队列中删除
            if (! isRunning(recheck) && remove(command)) 
                reject(command);  //拒绝任务
            else if (workerCountOf(recheck) == 0) //如果活动线程数为0,则创建新线程
                addWorker(null, false);
        }
        //如果线程池不处于RUNNING状态,或者workQueue满了,则执行以下代码
        else if (!addWorker(command, false))
            reject(command);
    }

可以看到,在类中使用了 Work类来代表任务,下面是 Work类的简单摘要:

private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        ...

Work类实现了 Runnable接口,使用了线程工厂创建线程,使用 runWork方法来运行任务
创建新线程时用到了 addWorker()方法:

/**
     * 检查在当前线程池状态和限制下能否创建一个新线程,如果可以,会相应改变workerCount,
     * 每个worker都会运行他们的firstTask
     * @param firstTask 第一个任务
     * @param core true使用corePoolSize作为边界,false使用maximumPoolSize
     * @return false 线程池关闭或者已经具备关闭的条件或者线程工厂没有创建新线程
     */
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 只有当rs < SHUTDOWN才有可能接受新任务
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c); //工作线程数量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) //不合法则返回
                    return false;
                if (compareAndIncrementWorkerCount(c)) //将工作线程数量+1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) //判断线程池状态有没有改变,改变了则进行外循环,否则只进行内循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //创建新线程
        Worker w = new Worker(firstTask);
        Thread t = w.thread;

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //再次检查状态,防止ThreadFactory创建线程失败或者状态改变了
            int c = ctl.get();
            int rs = runStateOf(c);

            if (t == null ||
                (rs >= SHUTDOWN &&
                 ! (rs == SHUTDOWN &&
                    firstTask == null))) {
                decrementWorkerCount();  //减少线程数量
                tryTerminate();//尝试中止线程
                return false;
            }

            workers.add(w);//添加到工作线程Set集合中

            int s = workers.size();
            if (s > largestPoolSize)
                largestPoolSize = s;
        } finally {
            mainLock.unlock();
        }

        t.start();//执行任务
       //状态变成了STOP(调用了shutdownNow方法)
        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
            t.interrupt();

        return true;
    }

再看 Work中runWork方法:

final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
        boolean completedAbruptly = true;//线程是否异常中止
        try {
            //先取firstTask,再从队列中取任务直到为null
            while (task != null || (task = getTask()) != null) {
                w.lock();
                clearInterruptsForTaskRun();
                try {
                    beforeExecute(w.thread, task);//实现钩子方法
                    Throwable thrown = null;
                    try {
                        task.run();//运行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//实现钩子方法
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;//成功运行,说明没有异常中止
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64431.html

相关文章

  • 后端ing

    摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...

    roadtogeek 评论0 收藏0
  • 技术经理:求求你,别再乱改数据库连接池的大小了!

    摘要:你仅仅需要一个大小为数据库连接池,然后让剩下的业务线程都在队列里等待就可以了。你应该经常会看到一些用户量不是很大的应用中,为应付大约十来个的并发,却将数据库连接池设置成,的情况。请不要过度配置您的数据库连接池的大小。 文章翻译整理自: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing欢迎关注个人微信公众...

    darkbug 评论0 收藏0
  • 技术经理:求求你,别再乱改数据库连接池的大小了!

    摘要:你仅仅需要一个大小为数据库连接池,然后让剩下的业务线程都在队列里等待就可以了。你应该经常会看到一些用户量不是很大的应用中,为应付大约十来个的并发,却将数据库连接池设置成,的情况。请不要过度配置您的数据库连接池的大小。 文章翻译整理自: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing欢迎关注个人微信公众...

    苏丹 评论0 收藏0
  • java线程池的原理学习(二)

    摘要:接上文线程池的原理学习简单介绍,线程池类,继承自构造方法提供了四种构造方法实现这里只介绍一种有必要对每个参数解释一下池中所保存的线程数,包括空闲线程。文档中提供了一个可以暂停和恢复的线程池例子运行原理线程池的原理学习三 接上文:java线程池的原理学习 ThreadPoolExecutor简单介绍 ThreadPoolExecutor,线程池类,继承自 AbstractExecutor...

    oujie 评论0 收藏0

发表评论

0条评论

mgckid

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<