资讯专栏INFORMATION COLUMN

并发数据结构与线程(ArrayBlockingQueue)

yck / 322人阅读

摘要:今天在群上抛出来一个问题,如下我以自带的数据结构为例,用源码的形式说明,如何阻塞线程通知线程的。一以可重入锁和两个对象来控制并发。四使用来控制并发,同时也使用的对象来与线程交互。

今天在QQ群上抛出来一个问题,如下

我以Java自带的数据结构为例,用源码的形式说明,如何阻塞线程、通知线程的。

一、Lock & Condition
ArrayBlockingQueue以可重入锁和两个Condition对象来控制并发。

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

构造函数中初始化了notEmpty和notFull.

    /**
     * Creates an ArrayBlockingQueue with the given (fixed)
     * capacity and the specified access policy.
     * @param capacity the capacity of this queue
     * @param fair if true then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if false the access order is unspecified.
     * @throws IllegalArgumentException if capacity is less than 1
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

二、线程阻塞
当ArrayBlockingQueue存储的元素是0个的时候,take()方法会阻塞.

    public Object take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            Object x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

这里take方法首先获得可重入锁lock,然后判断如果元素为空就执行notEmpty.await(); 这个时候线程挂起。

三、通知线程
比如使用put放入一个新元素,

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

enqueue方法中,

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

对刚才的notEmptyCondition进行通知。
四、ReentrantLock vs AbstractQueuedSynchronizer
ArrayBlockingQueue使用ReentrantLock来控制并发,同时也使用ArrayBlockingQueue的Condition对象来与线程交互。notEmptynotFull都是由
ReentrantLock的成员变量sync生成的,

public Condition newCondition() {
        return sync.newCondition();
    }

sync可以认为是一个抽象类类型,Sync,它是在ReentrantLock内部定义的静态抽象类,抽象类实现了newCondition方法,

final ConditionObject newCondition() {
            return new ConditionObject();
        }

返回的类型是实现了Condition接口的ConditionObject类,这是在AbstractQueuedSynchronizer内部定义的类。在ArrayBlockingQueue中的notEmpty就是ConditionObject实例。

阻塞:
ArrayBlockingQueue为空时,notEmpty.await()将自己挂起,如ConditionObject的await方法,

        /**
         * Implements interruptible condition wait.
         * 
    *
  1. If current thread is interrupted, throw InterruptedException. *
  2. Save lock state returned by {@link #getState}. *
  3. Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. *
  4. Block until signalled or interrupted. *
  5. Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. *
  6. If interrupted while blocked in step 4, throw InterruptedException. *
*/ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

addConditionWaiter是将当前线程作为一个node加入到ConditionObject的队列中,队列是用链表实现的。
如果是初次加入队列的情况,node.waitStatus == Node.CONDITION成立,方法isOnSyncQueue返回false,那么就将线程park。

while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                ....
}

至此线程被挂起,LockSupport.park(this);这里this是指ConditionObject,是notEmpty.
通知:
当新的元素put进入ArrayBlockingQueue后,notEmpty.signal()通知在这上面等待的线程,如ConditionObject的signal方法,

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

doSignal方法,

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

doSignal一开始接收到的参数就是firstWaiter这个参数,在内部实现中用了do..while的形式,首先将first的的nextWaiter找出来保存到firstWaiter此时(first和firstWaiter不是一回事),在while的比较条件中可调用了transferForSignal方法,
整个while比较条件可以看着短路逻辑,如果transferForSignal结果为true,后面的first = firstWaiter就不执行了,整个while循环就结束了。

参照注释,看

transferForSignal方法,

    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

首先确保想要被signal的等待node还是处于Node.CONDITION状态,然后调整状态为Node.SIGNAL,这两个都是采用CAS方法,最后调用的是

LockSupport.unpark(node.thread);

五、LockSupport
至此,我们已经知道了线程的挂起和通知都是使用LockSupport来完成的,并发数据结构与线程直接的交互最终也是需要LockSupport。那么关于LockSupport,我们又可以了解多少呢?

Ref:
Java中的ReentrantLock和synchronized两种锁定机制的对比
Java的LockSupport.park()实现分析

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68083.html

相关文章

  • 通俗易懂,JDK 并发容器总结

    摘要:线程安全的线程安全的,在读多写少的场合性能非常好,远远好于高效的并发队列,使用链表实现。这样带来的好处是在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。 该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...

    curlyCheng 评论0 收藏0
  • (十五)java多线程并发集合ArrayBlockingQueue

    摘要:本人邮箱欢迎转载转载请注明网址代码已经全部托管有需要的同学自行下载引言做的同学们或多或少的接触过集合框架在集合框架中大多的集合类是线程不安全的比如我们常用的等等我们写一个例子看为什么说是不安全的例子证明是线程不安全的我们开启个线程每个线程向 本人邮箱: 欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kcogithub: https://github...

    stefan 评论0 收藏0
  • 解读 Java 并发队列 BlockingQueue

    摘要:如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。 前言 本文直接参考 Doug Lea 写的 Java doc 和注释,这也是我们在学习 java 并发包时最好的材料了。希望大家能有所思、有所悟,学习 Doug Lea 的代码风格,并将其优雅...

    maochunguang 评论0 收藏0
  • ArrayBlockingQueueLinkedBlockingQueue

    摘要:序本文主要简单介绍下与。有界无界有界,适合已知最大存储容量的场景可有界可以无界吞吐量在大多数并发的场景下吞吐量比,但是性能不稳定。测试结果表明,的可伸缩性要高于。 序 本文主要简单介绍下ArrayBlockingQueue与LinkedBlockingQueue。 对比 queue 阻塞与否 是否有界 线程安全保障 适用场景 注意事项 ArrayBlockingQueue 阻...

    jackwang 评论0 收藏0
  • 并发包入坑指北』之阻塞队列

    摘要:自己实现在自己实现之前先搞清楚阻塞队列的几个特点基本队列特性先进先出。消费队列空时会阻塞直到写入线程写入了队列数据后唤醒消费线程。最终的队列大小为,可见线程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 较长一段时间以来我都发现不少开发者对 jdk 中的 J.U.C(java.util.c...

    nicercode 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<