资讯专栏INFORMATION COLUMN

SynchronousQueue使用实例

LiangJ / 3403人阅读

摘要:此时,会新建一个新的工作者线程用于对这个入队列失败的任务进行处理假设此时线程池的大小还未达到其最大线程池大小。但此时需要限定线程池的最大大小为一个合理的有限值,而不是,否则可能导致线程池中的工作者线程的数量一直增加到系统资源所无法承受为止。

本文主要讲一下SynchronousQueue。

定义

SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。

如果以洗盘子的比喻为例,那么这就相当于没有盘架,而是将洗好的盘子直接放入下一个空闲的烘干机中。这种实现队列的方式看似很奇怪,但由于可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟。(在传统的队列中,在一个工作单元可以交付之前,必须通过串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操作。)

直接交付方式还会将更多关于任务状态的信息反馈给生产者。当交付被接受时,它就知道消费者已经得到了任务,而不是简单地把任务放入一个队列——这种区别就好比将文件直接交给同事,还是将文件放到她的邮箱中并希望她能尽快拿到文件。

因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

实例
public class SynchronousQueueExample {

    static class SynchronousQueueProducer implements Runnable {

        protected BlockingQueue blockingQueue;
        final Random random = new Random();

        public SynchronousQueueProducer(BlockingQueue queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = UUID.randomUUID().toString();
                    System.out.println("Put: " + data);
                    blockingQueue.put(data);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    static class SynchronousQueueConsumer implements Runnable {

        protected BlockingQueue blockingQueue;

        public SynchronousQueueConsumer(BlockingQueue queue) {
            this.blockingQueue = queue;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    String data = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()
                            + " take(): " + data);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {
        final BlockingQueue synchronousQueue = new SynchronousQueue();

        SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
                synchronousQueue);
        new Thread(queueProducer).start();

        SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer1).start();

        SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
                synchronousQueue);
        new Thread(queueConsumer2).start();

    }
}

插入数据的线程和获取数据的线程,交替执行

应用场景

Executors.newCachedThreadPool()

 /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue(),
                                      threadFactory);
    }

由于ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。

所以,使用SynchronousQueue作为工作队列,工作队列本身并不限制待执行的任务的数量。但此时需要限定线程池的最大大小为一个合理的有限值,而不是Integer.MAX_VALUE,否则可能导致线程池中的工作者线程的数量一直增加到系统资源所无法承受为止。

如果应用程序确实需要比较大的工作队列容量,而又想避免无界工作队列可能导致的问题,不妨考虑SynchronousQueue。SynchronousQueue实现上并不使用缓存空间。

使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。

doc

A Guide to Java SynchronousQueue

SynchronousQueue Example in Java - Produer Consumer Solution

Java SynchronousQueue

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70453.html

相关文章

  • 图解SynchronousQueue原理详解-公平模式

    摘要:如果节点不为说明已经有其他线程进行操作将节点替换为节点等待有消费者消费线程。如果头节点下一个节点是当前节点以防止其他线程已经修改了节点则运算,否则直接返回。 一、介绍 SynchronousQueue是一个双栈双队列算法,无空间的队列或栈,任何一个对SynchronousQueue写需要等到一个对SynchronousQueue的读操作,反之亦然。一个读操作需要等待一个写操作,相当于是...

    jifei 评论0 收藏0
  • TransferQueue实例

    摘要:实际上是公平模式和的超集。而使用操作实现一个非阻塞的方法,这是避免序列化处理任务的关键。在这样的设计中,消费者的消费能力将决定生产者产生消息的速度。实例输出中的模式手记之似懂非懂的和长度为的 序 本文主要简介一下TransferQueue。 TransferQueue TransferQueue(java7引入)继承了BlockingQueue(BlockingQueue又继承了Que...

    MarvinZhang 评论0 收藏0
  • java虚拟机故障处理工具

    摘要:这些工具包括名称主要作用显示指定系统内所有的虚拟机进程。虚拟机堆转存快照分析工具命令用于与搭配使用,用来分析生成的文件。命令格式命令样例线程堆栈跟踪工具用于生成虚拟机当前时刻的线程快照。 概述 给系统定位问题的时候,知识、经验是关键基础,数据是依据,工具是运用知识处理数据的手段。 java开发人员可以在jdk安装的bin目录下找到除了java,javac以外的其他命令。这些命令主要是一...

    loonggg 评论0 收藏0
  • BlockingQueue学习

    摘要:引言在包中,很好的解决了在多线程中,如何高效安全传输数据的问题。同时,也用于自带线程池的缓冲队列中,了解也有助于理解线程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解决了在多线程中,如何高效安全传输数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。同时,BlockingQueue也用于...

    xuhong 评论0 收藏0
  • Java多线程进阶(三五)—— J.U.C之collections框架:SynchronousQue

    摘要:三总结主要用于线程之间的数据交换,由于采用无锁算法,其性能一般比单纯的其它阻塞队列要高。它的最大特点时不存储实际元素,而是在内部通过栈或队列结构保存阻塞线程。 showImg(https://segmentfault.com/img/bVbgOsh?w=900&h=900); 本文首发于一世流云专栏:https://segmentfault.com/blog... 一、Synchro...

    missonce 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<