资讯专栏INFORMATION COLUMN

支持生产阻塞的线程池

yunhao / 1773人阅读

摘要:参考类似的思路,最简单的做法,我们可以直接定义一个,当队列满时改为调用来实现生产者的阻塞这样,我们就无需再关心和的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

在各种并发编程模型中,生产者-消费者模式大概是最常用的了。在实际工作中,对于生产消费的速度,通常需要做一下权衡。通常来说,生产任务的速度要大于消费的速度。一个细节问题是,队列长度,以及如何匹配生产和消费的速度。

一个典型的生产者-消费者模型如下:

在并发环境下利用J.U.C提供的Queue实现可以很方便地保证生产和消费过程中的线程安全。这里需要注意的是,Queue必须设置初始容量,防止生产者生产过快导致队列长度暴涨,最终触发OutOfMemory。

对于一般的生产快于消费的情况。当队列已满时,我们并不希望有任何任务被忽略或得不到执行,此时生产者可以等待片刻再提交任务,更好的做法是,把生产者阻塞在提交任务的方法上,待队列未满时继续提交任务,这样就没有浪费的空转时间了。阻塞这一点也很容易,BlockingQueue就是为此打造的,ArrayBlockingQueue和LinkedBlockingQueue在构造时都可以提供容量做限制,其中LinkedBlockingQueue是在实际操作队列时在每次拿到锁以后判断容量。

更进一步,当队列为空时,消费者拿不到任务,可以等一会儿再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,当有任务时便可以立即获得执行,建议调用take的带超时参数的重载方法,超时后线程退出。这样当生产者事实上已经停止生产时,不至于让消费者无限等待。

于是一个高效的支持阻塞的生产消费模型就实现了。

等一下,既然J.U.C已经帮我们实现了线程池,为什么还要采用这一套东西?直接用ExecutorService不是更方便?

我们来看一下ThreadPoolExecutor的基本结构:

可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已经帮我们实现好了,并且直接采用线程池的实现还有很多优势,例如线程数的动态调整等。

但问题在于,即便你在构造ThreadPoolExecutor时手动指定了一个BlockingQueue作为队列实现,事实上当队列满时,execute方法并不会阻塞,原因在于ThreadPoolExecutor调用的是BlockingQueue非阻塞的offer方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

这时候就需要做一些事情来达成一个结果:当生产者提交任务,而队列已满时,能够让生产者阻塞住,等待任务被消费。

关键在于,在并发环境下,队列满不能由生产者去判断,不能调用ThreadPoolExecutor.getQueue().size()来判断队列是否满。

线程池的实现中,当队列满时会调用构造时传入的RejectedExecutionHandler去拒绝任务的处理。默认的实现是AbortPolicy,直接抛出一个RejectedExecutionException。

几种拒绝策略在这里就不赘述了,这里和我们的需求比较接近的是CallerRunsPolicy,这种策略会在队列满时,让提交任务的线程去执行任务,相当于让生产者临时去干了消费者干的活儿,这样生产者虽然没有被阻塞,但提交任务也会被暂停。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a CallerRunsPolicy.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller"s thread, unless the executor
     * has been shut down, in which case the task is discarded.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                // should not be interrupted
            }
        }
    }
};

这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。


via ifeve

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64042.html

相关文章

  • 想进大厂?50个多线程面试题,你会多少?(一)

    摘要:下面是线程相关的热门面试题,你可以用它来好好准备面试。线程安全问题都是由全局变量及静态变量引起的。持有自旋锁的线程在之前应该释放自旋锁以便其它线程可以获得自旋锁。 最近看到网上流传着,各种面试经验及面试题,往往都是一大堆技术题目贴上去,而没有答案。 不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就是内置了对并发的支持,让Java大受企业和程序员...

    wow_worktile 评论0 收藏0
  • 使用 Executors,ThreadPoolExecutor,创建线程,源码分析理解

    摘要:源码分析创建可缓冲的线程池。源码分析使用创建线程池源码分析的构造函数构造函数参数核心线程数大小,当线程数,会创建线程执行最大线程数,当线程数的时候,会把放入中保持存活时间,当线程数大于的空闲线程能保持的最大时间。 之前创建线程的时候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 评论0 收藏0
  • Java SDK 并发包全面总结

    摘要:一和并发包中的和主要解决的是线程的互斥和同步问题,这两者的配合使用,相当于的使用。写锁与读锁之间互斥,一个线程在写时,不允许读操作。的注意事项不支持重入,即不可反复获取同一把锁。没有返回值,也就是说无法获取执行结果。 一、Lock 和 Condition Java 并发包中的 Lock 和 Condition 主要解决的是线程的互斥和同步问题,这两者的配合使用,相当于 synchron...

    luckyyulin 评论0 收藏0
  • 线程中你不容错过一些细节

    摘要:第二还是大家对线程池的理解不够深刻,比如今天要探讨的内容。我认为线程池它就是一个调度任务的工具。而在线程池这个场景中却恰好就是要利用它只是一个普通方法调用。 showImg(https://segmentfault.com/img/remote/1460000018653817); 背景 上周分享了一篇《一个线程罢工的诡异事件》,最近也在公司内部分享了这个案例。 无独有偶,在内部分享的...

    kgbook 评论0 收藏0
  • Java并发编程笔记(二)

    摘要:本文探讨并发中的其它问题线程安全可见性活跃性等等。当闭锁到达结束状态时,门打开并允许所有线程通过。在从返回时被叫醒时,线程被放入锁池,与其他线程竞争重新获得锁。 本文探讨Java并发中的其它问题:线程安全、可见性、活跃性等等。 在行文之前,我想先推荐以下两份资料,质量很高:极客学院-Java并发编程读书笔记-《Java并发编程实战》 线程安全 《Java并发编程实战》中提到了太多的术语...

    NickZhou 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<