资讯专栏INFORMATION COLUMN

java-AQS源码浅析

Lemon_95 / 2639人阅读

摘要:获取资源失败,将该线程加入等待队列尾部,标记为独占模式。如果有剩余资源则会唤醒下一个线程,且整个过程忽略中断的影响。

AQS概念及定义

ASQ:AbstractQueuedSynchronizer

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列,有个内部类Node定义了节点。队列由AQS的volatile成员变量head和tail组成一个双向链表)

资源共享方式

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

自定义同步器

AQS是抽象类,使用了模板方法设计模式,已经将流程定义好,且实现了对等待队列的维护,因此实现者只需要按需实现AQS预留的四个方法即可。

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

核心方法分析

1.1 acquire(int)

方法定义

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。

方法源码
    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
函数流程
1. tryAcquire():尝试获取资源。
2. addWaiter(Node.EXCLUSIVE):获取资源失败,将该线程加入等待队列尾部,标记为独占模式。
3. acquireQueued(Node,int):获取该node指定数量的资源数,会一直等待成功获取才返回,返回值是在获取期间是否中断过
源码分析

1. tryAcquire()

  /**
     * Attempts to acquire in exclusive mode. This method should query
     * if the state of the object permits it to be acquired in the
     * exclusive mode, and if so to acquire it.
     *
     * 

This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. This can be used * to implement method {@link Lock#tryLock()}. * *

The default * implementation throws {@link UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return {@code true} if successful. Upon success, this object has * been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }

这是个抽象方法,用于给实现者自定义实现,此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义。

2. addWaiter(Node)

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        //新建Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //快速尝试一次,使用CAS将node放到队尾,失败调用enq
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //保证将Node放入队尾
        enq(node);
        return node;
    }

enq源码

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node"s predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //如果尾节点为空,说明队列还未进行初始化
            if (t == null) { // Must initialize
                //CAS设置头结点
                if (compareAndSetHead(new Node()))
                    //初始头尾相同,从下一次循环开始尝试加入新Node
                    tail = head;
            } else {
                node.prev = t;
                //CAS将当前节点设置为尾节点
                if (compareAndSetTail(t, node)) {
                    //设置成功返回当前节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

3. acquireQueued(Node, int)

/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        //标志是否成功获取资源
        boolean failed = true;
        try {
            //是否被中断
            boolean interrupted = false;
            for (;;) {
                //获取前驱Node
                final Node p = node.predecessor();
                //如果自己是队列中第二个节点,那会进行尝试获取,进入这里判断要么是一次,要么是被前驱节点给unPark唤醒了。
                if (p == head && tryAcquire(arg)) {
                    //成功获取资源,设置自身为头节点,将原来的头结点剥离队列
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判断是否需要被park,如果需要进行park并检测是否被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //如果获取资源失败了将当前node取消,
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire方法

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node"s predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前驱的状态已经是signal,代表前驱释放是会通知唤醒你,那么此node可以安心被park
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             //如果前驱已经被取消,那么从当前node一直往前找,直到有非取消的node,直接排在它的后面,此时不需要park,会出去再尝试一次获取资源。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don"t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             //前驱节点没有被取消,那么告诉前驱节点释放的时候通知自己
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt()

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        //让该线程进入wait状态
        LockSupport.park(this);
        //返回期间是被中断过
        return Thread.interrupted();
    }

acquireQueued流程总结

检查自己是否是老二,且是否能获得资源,能获得自己成为head节点,否则进入流程2。

2.找到“有效”(not canceled)的前驱,并通知前驱释放了要“通知”(watiStatus=signal)我,安心被park。
3。被前驱unpark,或interrrupt(),继续流程1。

acquire小结

首先调用实现者实现的tryAcquire()去获取资源,如果成功则直接返回。

如果失败,则新建一个独占模式的节点加到队列尾部。

通知一个有效的前驱记得释放时唤醒自己,在唤醒时自己再进行不断tryAcquire()直到获取到资源,返回是否被中断过。

如果等待过程中被中断过,则将将中断补上,调用当前线程的interrupt().

至此acquire流程完结,

1.2 release(int)

方法定义

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义。

方法源码
   /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        //调用实现者的尝试解锁方法,因为已经获得锁,所以基本不会失败
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //唤醒下一个节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor()

 /**
     * Wakes up node"s successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)//设置当前节点的状态允许失败,失败了也没关系。
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
         //找到下一个需要被唤醒的节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//如果是空或被取消
            s = null;
            //从尾节点开始寻找,直到找到最前面的有效的节点。因为锁已经释放,所以从尾节点开始找可以避免因为高并发下复杂的队列动态变化带来的逻辑判断,
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
release小结

首先调用实现者的tryRelease(),失败则返回false

成功则找到下一个有效的节点并唤醒它。

注意实现者实现tryRelease应该是当state为0时才返回

1.3 acquireShared(int)

方法定义

此方法是共享模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回。如果有剩余资源则会唤醒下一个线程,否则进入wait,且整个过程忽略中断的影响。

方法源码
    /**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        //尝试获取指定数量资源
        if (tryAcquireShared(arg) < 0)
            //获取资源直到成功
            doAcquireShared(arg);
    }

共享模式下的流程与独占模式极为相似,首先根据tryAcquireShared(arg)尝试是否能获取到资源,能则直接返回,不能则会进入队列按入队顺序依次唤醒尝试获取。

tryAcquireShared(int)

/**
     * Attempts to acquire in shared mode. This method should query if
     * the state of the object permits it to be acquired in the shared
     * mode, and if so to acquire it.
     *
     * 

This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. * *

The default implementation throws {@link * UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }

这是AQS预留给实现者的方法,用于共享模式下尝试获取指定数量的资源,返回值<0代表获取失败,=0代表获取成功且无剩余资源,>0代表还有剩余资源

doAcquireShared(int)方法用于共享模式获取资源会直到获取成功才返回

/**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        //添加当前线程的Node模式为共享模式至队尾,
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                //如果自己是老二才有尝试的资格
                if (p == head) {
                    //尝试获取指定数量资源
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //如果成功获取,将当前节点设置为头节点,如果有剩余资源唤醒下一有效节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //如果有中断,自己补偿中断
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //判断是否需要被park,和park后检查是否被中弄断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //如果获取失败,取消当前节点
            if (failed)
                cancelAcquire(node);
        }
    }

流程和独占模式几乎一模一样,但是代码的书写缺有不同,不知原作者是咋想的。区别于独占不同的有两点

添加模式为SHARED1的Node。

在成功获取到资源后,设置当前节点为head节点时,如果还有剩余资源的话,会唤醒下一个有效的节点,如果资源数量不够下一节点,下一节点会一直等待,直到其它节点释放,并不会让步给后面的节点,取决于FIFO的按顺序出队。

setHeadAndPropagate()看有剩余资源的时候如何唤醒下一节点

/**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        //将当前节点设置为head节点
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don"t know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
         //如果有剩余资源
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //当下一个有效节点存在且是共享模式时,会唤醒它
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

doReleaseShared()唤醒下一共享模式节点

/**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //如果头结点状态是“通知后继”
                if (ws == Node.SIGNAL) {
                    //将其状态改为0,表示已通知
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒后继
                    unparkSuccessor(h);
                }
                //如果已通知后继,则改为可传播,在下次acquire中的shouldParkAfterFailedAcquire会将改为SIGNAL
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //如果头结点变了,再次循环
            if (h == head)                   // loop if head changed
                break;
        }
    }
acquireShared小结

共享模式acquire与独占模式技术相同,唯一的不同就是在于如果当前节点获取资源成功且有剩余则会唤醒下一节点,资源可以为多个线程功能分配,而独占模式则就是一个线程独占。

1.4 releaseShared(int)

方法定义

此方法是共享模式下线程释放共享资源的顶层入口。如果释放资源成功,直接返回。如果有剩余资源则会唤醒下一个线程,且整个过程忽略中断的影响。

方法源码
/**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        //尝试共享模式获取资源
        if (tryReleaseShared(arg)) {
            //唤醒下一节点
            doReleaseShared();
            return true;
        }
        return false;
    }
AQS的源码分析就到这里为止由于本人目前功力尚浅,对AQS的理解停留在代码级别,下此会将应用补上,如有不对和遗漏欢迎各位补充。

参考文章
Java并发之AQS详解

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68444.html

相关文章

  • 浅析webpack源码之前言(一)

    为什么读webpack源码 因为前端框架离不开webpack,天天都在用的东西啊,怎能不研究 读源码能学到很多做项目看书学不到的东西,比如说架构,构造函数,es6很边缘的用法,甚至给函数命名也会潜移默化的影响等 想写源码,不看源码怎么行,虽然现在还不知道写什么,就算不写什么,看看别人写的总可以吧 知道世界的广阔,那么多插件,那么多软件开发师,他们在做什么,同样是写js的,怎么他们能这么伟大 好奇...

    suosuopuo 评论0 收藏0
  • 浅析HashMap源码(1)

    摘要:前言本文的目的是阅读理解的源码,作为集合中重要的一个角色,平时用到十分多的一个类,深入理解它,知其所以然很重要。 前言 本文的目的是阅读理解HashMap的源码,作为集合中重要的一个角色,平时用到十分多的一个类,深入理解它,知其所以然很重要。本文基于Jdk1.7,因为Jdk1.8改变了HashMap的数据结构,进行了优化,我们先从基础阅读,之后再阅读理解Jdk1.8的内容 HashMa...

    wwolf 评论0 收藏0
  • 浅析`redux-thunk`中间件源码

    摘要:大多的初学者都会使用中间件来处理异步请求,其理解简单使用方便具体使用可参考官方文档。源码的源码非常简洁,出去空格一共只有行,这行中如果不算上则只有行。官方文档中的一节讲解的非常好,也确实帮我理解了中间件的工作原理,非常推荐阅读。 总觉得文章也应该是有生命力的,欢迎关注我的Github上的博客,这里的文章会依据我本人的见识,逐步更新。 大多redux的初学者都会使用redux-thunk...

    wing324 评论0 收藏0
  • 浅析es6-promise源码

    摘要:主要逻辑本质上还是回调函数那一套。通过的判断完成异步和同步的区分。 主要逻辑: 本质上还是回调函数那一套。通过_subscribers的判断完成异步和同步的区分。通过 resolve,reject -> publish -> invokeCallback -> resolve,reject的递归和下一条then的parent是上一条的child来完成then链的流转 同步情况...

    fox_soyoung 评论0 收藏0
  • 浅析webpack源码之NodeEnvironmentPlugin模块总览(六)

    摘要:进入传入地址出来一个复杂对象把挂载到对象上太复杂我们先看可以缓存输入的文件系统输入文件系统输出文件系统,挂载到对象传入输入文件,监视文件系统,挂载到对象添加事件流打开插件读取目录下文件对文件名进行格式化异步读取目录下文件同步方法就 进入webpack.js //传入地址,new Compiler出来一个复杂对象 compiler = new Compiler(options.conte...

    ChristmasBoy 评论0 收藏0

发表评论

0条评论

Lemon_95

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<