资讯专栏INFORMATION COLUMN

AbstractQueuedSynchronizer理解之一(ReentrantLock)

bigdevil_s / 1055人阅读

摘要:有了这个基础,才能发挥作用,使得在节点取消和异常时能够保证队列在多线程下的完整性。

Doug Lea是JDK中concurrent工具包的作者,这位大神是谁可以自行google。

本文浅析ReentrantLock(可重入锁)的原理

Lock接口

Lock接口定义了这几个方法:

lock()
用来获取锁,如果锁已经被其他线程占有,则进行等待,直到抢占到锁;该方法在发送异常时不会自动释放锁,所以在使用时需要在finall块中释放锁;

tryLock()和tryLock(long time, TimeUnit unit)
尝试获得锁,如果锁已经被其他线程占有,返回false,成功获取锁返回true;该方法不会等待,立即返回;而带有参数的tryLock在等待时长内拿到锁返回true,超时或者没拿到锁返回false;带参数的方法还支持响应中断;

lockInterruptibly()
支持中断的lock();

unlock()
释放锁;

newCondition()
新建Condition,Condition以后会分析;

ReentrantLock可重入锁

ReentrantLock实现了Lock接口,ReentrantLock中有一个重要的成员变量,同步器sync继承了AbstractQueuedSynchronizer简称AQS,我们先介绍AQS

AQS用一个队列(结构是一个FIFO队列)来管理同步状态,当线程获取同步状态失败时,会将当前线程包装成一个Node放入队列,当前线程进入阻塞状态;当同步状态释放时,会从队列去出线程获取同步状态。

AQS里定义了head、tail、state,他们都是volatile修饰的,head指向队列的第一个元素,tail指向队列的最后一个元素,state表示了同步状态,这个状态非常重要,在ReentrantLock中,state为0的时候代表锁被释放,state为1时代表锁已经被占用;

看下面代码:

</>复制代码

  1. private static final Unsafe unsafe = Unsafe.getUnsafe();
  2. private static final long stateOffset;
  3. private static final long headOffset;
  4. private static final long tailOffset;
  5. private static final long waitStatusOffset;
  6. private static final long nextOffset;
  7. static {
  8. try {
  9. stateOffset = unsafe.objectFieldOffset
  10. (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  11. headOffset = unsafe.objectFieldOffset
  12. (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  13. tailOffset = unsafe.objectFieldOffset
  14. (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  15. waitStatusOffset = unsafe.objectFieldOffset
  16. (Node.class.getDeclaredField("waitStatus"));
  17. nextOffset = unsafe.objectFieldOffset
  18. (Node.class.getDeclaredField("next"));
  19. } catch (Exception ex) { throw new Error(ex); }
  20. }

这一段静态初始化代码初始了state、head、tail等变量的在内存中的偏移量;Unsafe类是sun.misc下的类,不属于java标准。Unsafe让java可以像C语言一样操作内存指针,其中就提供了CAS的一些原子操作和park、unpark对线程挂起与恢复的操作;关于CAS是concurrent工具包的基础,以后会多带带介绍,其主要作用就是在硬件级别提供了compareAndSwap的功能,从而实现了比较和交换的原子性操作。

AQS还有一个内部类叫Node,它将线程封装,利用prev和next可以将Node串连成双向链表,这就是一开始说的FIFO的结构;

ReentrantLock提供了公平锁和非公平锁,我们这里从非公平锁分析AQS的应用;
Lock调用lock()方法时调用了AQS的lock()方法,我们来看这个非公平锁NonfairSync的lock方法:

</>复制代码

  1. final void lock() {
  2. //首先调用CAS抢占同步状态state,如果成功则将当前线程设置为同步器的独占线程,
  3. //这也是非公平的体现,因为新来的线程没有马上加入队列尾部,而是先尝试抢占同步状态。
  4. if (compareAndSetState(0, 1))
  5. setExclusiveOwnerThread(Thread.currentThread());
  6. else
  7. //抢占同步状态失败,调用AQS的acquire
  8. acquire(1);
  9. }

瞄一眼acquire方法:

</>复制代码

  1. public final void acquire(int arg) {
  2. //在这里还是先试着抢占一下同步状态
  3. if (!tryAcquire(arg) &&
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  5. selfInterrupt();
  6. }

tryAcquire调用的是NonfairSync的实现,然后又调用了Sync的nonfairTryAcquire方法:

</>复制代码

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. //和之前一样,利用CAS抢占同步状态,成功则设置当前线程为独占线程并且返回true
  6. if (compareAndSetState(0, acquires)) {
  7. setExclusiveOwnerThread(current);
  8. return true;
  9. }
  10. }
  11. //如果当前线程已经是独占线程,即当前线程已经获得了同步状态则将同步状态state加1,
  12. //这里是可重入锁的体现
  13. else if (current == getExclusiveOwnerThread()) {
  14. int nextc = c + acquires;
  15. if (nextc < 0) // overflow
  16. throw new Error("Maximum lock count exceeded");
  17. setState(nextc);
  18. return true;
  19. }
  20. //没有抢占到同步状态返回false
  21. return false;
  22. }

再看addWaiter方法:

</>复制代码

  1. private Node addWaiter(Node mode) {
  2. //新建一个Node,封装了当前线程和模式,这里传入的是独占模式Node.EXCLUSIVE
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // Try the fast path of enq; backup to full enq on failure
  5. Node pred = tail;
  6. //如果tail不为空就不需要初始化node队列了
  7. if (pred != null) {
  8. //将node作为队列最后一个元素入列
  9. node.prev = pred;
  10. if (compareAndSetTail(pred, node)) {
  11. pred.next = node;
  12. //返回新建的node
  13. return node;
  14. }
  15. }
  16. //如果tail为空则表示node队列还没有初始化,此时初始化队列
  17. enq(node);
  18. return node;
  19. }

瞄一眼enq方法:

</>复制代码

  1. private Node enq(final Node node) {
  2. //无限loop直到CAS成功,其他地方也大量使用了无限loop
  3. for (;;) {
  4. Node t = tail;
  5. if (t == null) { // Must initialize
  6. //队列尾部为空,必须初始化,head初始化为一个空node,不包含线程,tail = head
  7. if (compareAndSetHead(new Node()))
  8. tail = head;
  9. } else {
  10. //队列已经初始化,将当前node加在列尾
  11. node.prev = t;
  12. //将当前node设置为tail,CAS操作,enqueue安全
  13. if (compareAndSetTail(t, node)) {
  14. t.next = node;
  15. return t;
  16. }
  17. }
  18. }
  19. }

拿到新建的node后传给acquireQueued方法:

</>复制代码

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. //标记是否中断状态
  5. boolean interrupted = false;
  6. for (;;) {
  7. //拿到当前node的前驱
  8. final Node p = node.predecessor();
  9. //如果前驱正好为head,即当前线程在列首,马上tryAcquire抢占同步状态
  10. if (p == head && tryAcquire(arg)) {
  11. //抢占成功后,将当前节点的thread、prev清空作为head
  12. setHead(node);
  13. p.next = null; // help GC 原来的head等待GC回收
  14. failed = false;
  15. return interrupted;
  16. }
  17. //没有抢占成功后,判断是否要park
  18. if (shouldParkAfterFailedAcquire(p, node) &&
  19. parkAndCheckInterrupt())
  20. interrupted = true;
  21. }
  22. } finally {
  23. if (failed)
  24. cancelAcquire(node);
  25. }
  26. }

瞄一眼shouldParkAfterFailedAcquire方法:

</>复制代码

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. if (ws == Node.SIGNAL)
  4. //如果前驱node的状态为SIGNAL,说明当前node可以park
  5. /*
  6. * This node has already set status asking a release
  7. * to signal it, so it can safely park.
  8. */
  9. return true;
  10. if (ws > 0) {
  11. //如果前驱的状态大于0说明前驱node的thread已经被取消
  12. /*
  13. * Predecessor was cancelled. Skip over predecessors and
  14. * indicate retry.
  15. */
  16. do {
  17. //从前驱node开始,将取消的node移出队列
  18. //当前节点之前的节点不会变化,所以这里可以更新prev,而且不必用CAS来更新。
  19. node.prev = pred = pred.prev;
  20. } while (pred.waitStatus > 0);
  21. pred.next = node;
  22. } else {
  23. //前驱node状态等于0或者为PROPAGATE(以后会介绍)
  24. //将前驱node状态设置为SIGNAL,返回false,表示当前node暂不需要park,
  25. //可以再尝试一下抢占同步状态
  26. /*
  27. * waitStatus must be 0 or PROPAGATE. Indicate that we
  28. * need a signal, but don"t park yet. Caller will need to
  29. * retry to make sure it cannot acquire before parking.
  30. */
  31. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  32. }
  33. return false;
  34. }

看一下parkAndCheckInterrupt方法:

</>复制代码

  1. private final boolean parkAndCheckInterrupt() {
  2. //阻塞当前线程
  3. LockSupport.park(this);
  4. //返回当前线程是否设置中断标志,并清空中断标志
  5. return Thread.interrupted();
  6. }

这里解释一下为什么要保存一下中断标志:中断会唤醒被park的阻塞线程,但被park的阻塞线程不会响应中断,所以这里保存一下中断状态并返回,如果状态为true说明发生过中断,会补发一次中断,即调用interrupt()方法

在acquireQueued中发生异常时执行cancelAcquire:

</>复制代码

  1. private void cancelAcquire(Node node) {
  2. // Ignore if node doesn"t exist
  3. if (node == null)
  4. return;
  5. //清空node的线程
  6. node.thread = null;
  7. // Skip cancelled predecessors
  8. //移除被取消的前继node,这里只移动了node的prev,没有改变next
  9. Node pred = node.prev;
  10. while (pred.waitStatus > 0)
  11. node.prev = pred = pred.prev;
  12. // predNext is the apparent node to unsplice. CASes below will
  13. // fail if not, in which case, we lost race vs another cancel
  14. // or signal, so no further action is necessary.
  15. //获取前继node的后继node
  16. Node predNext = pred.next;
  17. // Can use unconditional write instead of CAS here.
  18. // After this atomic step, other Nodes can skip past us.
  19. // Before, we are free of interference from other threads.
  20. //设置当前node等待状态为取消,其他线程检测到取消状态会移除它们
  21. node.waitStatus = Node.CANCELLED;
  22. // If we are the tail, remove ourselves.
  23. if (node == tail && compareAndSetTail(node, pred)) {
  24. //如果当前node为tail,将前驱node设置为tail(CAS)
  25. //设置前驱node(即现在的tail)的后继为null(CAS)
  26. //此时,如果中间有取消的node,将没有引用指向它,将被GC回收
  27. compareAndSetNext(pred, predNext, null);
  28. } else {
  29. // If successor needs signal, try to set pred"s next-link
  30. // so it will get one. Otherwise wake it up to propagate.
  31. int ws;
  32. if (pred != head &&
  33. ((ws = pred.waitStatus) == Node.SIGNAL ||
  34. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  35. pred.thread != null) {
  36. //如果当前node既不是head也不是tail,设置前继node的后继为当前node后继
  37. Node next = node.next;
  38. if (next != null && next.waitStatus <= 0)
  39. compareAndSetNext(pred, predNext, next);
  40. } else {
  41. //唤醒当前node后继
  42. unparkSuccessor(node);
  43. }
  44. //当前node的next设置为自己
  45. //注意现在当前node的后继的prev还指向当前node,所以当前node还未被删除,prev是在移除取消节点时更新的
  46. //这里就是为什么在前面要从后往前找可换新的node原因了,next会导致死循环
  47. node.next = node; // help GC
  48. }
  49. }

画图描述解析一下cancelAcquire:

首先看如何跳过取消的前驱

这时,前驱被取消的node并没有被移出队列,前驱的前驱的next还指向前驱;

如果当前node是tail的情况:

这时,没有任何引用指向当前node;

如果当前node既不是tail也不是head:

这时,当前node的前驱的next指向当前node的后继,当前node的next指向自己,pre都没有更新;

如果当前node是head的后继:

这时,只是简单的将当前node的next指向自己;

到这里,当线程抢占同步状态的时候,会进入FIFO队列等待同步状态被释放。在unlock()方法中调用了同步器的release方法;看一下release方法:

</>复制代码

  1. public final boolean release(int arg) {
  2. //判断是否释放同步状态成功
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. if (h != null && h.waitStatus != 0)
  6. //如果head不为null,且head的等待状态不为0
  7. //唤醒后继node的线程
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }

再来看一下tryRelease方法(在Sync类中实现):

</>复制代码

  1. protected final boolean tryRelease(int releases) {
  2. int c = getState() - releases;
  3. //当前thread不是独占模式的那个线程,抛出异常
  4. if (Thread.currentThread() != getExclusiveOwnerThread())
  5. throw new IllegalMonitorStateException();
  6. boolean free = false;
  7. if (c == 0) {
  8. //如果同步状态state为0,释放成功,将独占线程设置为null
  9. free = true;
  10. setExclusiveOwnerThread(null);
  11. }
  12. //更新同步状态state
  13. setState(c);
  14. return free;
  15. }

继续看unparkSuccessor(唤醒后继node的tread)方法:

</>复制代码

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * If status is negative (i.e., possibly needing signal) try
  4. * to clear in anticipation of signalling. It is OK if this
  5. * fails or if status is changed by waiting thread.
  6. */
  7. int ws = node.waitStatus;
  8. if (ws < 0)
  9. //head的等待状态为负数,设置head的等待状态为0
  10. compareAndSetWaitStatus(node, ws, 0);
  11. /*
  12. * Thread to unpark is held in successor, which is normally
  13. * just the next node. But if cancelled or apparently null,
  14. * traverse backwards from tail to find the actual
  15. * non-cancelled successor.
  16. */
  17. Node s = node.next;
  18. if (s == null || s.waitStatus > 0) {
  19. //如果head的后继node不存在或者后继node等待状态大于0(即取消)
  20. //从尾部往当前node迭代找到等待状态为负数的node,unpark
  21. //因为会有取消的节点
  22. s = null;
  23. for (Node t = tail; t != null && t != node; t = t.prev)
  24. if (t.waitStatus <= 0)
  25. s = t;
  26. }
  27. if (s != null)
  28. LockSupport.unpark(s.thread);
  29. }
总结

介绍完ReentrantLock后,我们大体了解了AQS的工作原理。AQS主要就是使用了同步状态和队列实现了锁的功能。有了CAS这个基础,AQS才能发挥作用,使得在enqueue、dequeque、节点取消和异常时能够保证队列在多线程下的完整性。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76306.html

相关文章

  • AbstractQueuedSynchronizer理解之一ReentrantLock

    摘要:有了这个基础,才能发挥作用,使得在节点取消和异常时能够保证队列在多线程下的完整性。 Doug Lea是JDK中concurrent工具包的作者,这位大神是谁可以自行google。 本文浅析ReentrantLock(可重入锁)的原理 Lock接口 showImg(https://segmentfault.com/img/bV2671?w=276&h=176); Lock接口定义了这几个...

    learning 评论0 收藏0
  • AbstractQueuedSynchronizer理解之一ReentrantLock

    摘要:有了这个基础,才能发挥作用,使得在节点取消和异常时能够保证队列在多线程下的完整性。 Doug Lea是JDK中concurrent工具包的作者,这位大神是谁可以自行google。 本文浅析ReentrantLock(可重入锁)的原理 Lock接口 showImg(https://segmentfault.com/img/bV2671?w=276&h=176); Lock接口定义了这几个...

    yunhao 评论0 收藏0
  • AbstractQueuedSynchronizer 理解 ReentrantLock

    摘要:当前线程已经获取过这个锁,则此时是重入,改变的计数即可,返回表示加锁成功。的核心在于使用更新锁的状态,并利用一个同步队列将获取锁失败的线程进行排队,当前驱节点解锁后再唤醒后继节点,是一个几乎纯实现的加锁与解锁。 简介 Java 并发编程离不开锁, Synchronized 是常用的一种实现加锁的方式,使用比较简单快捷。在 Java 中还有另一种锁,即 Lock 锁。 Lock 是一个接...

    LeoHsiun 评论0 收藏0
  • Java多线程进阶(七)—— J.U.C之locks框架:AQS独占功能剖析(2)

    摘要:开始获取锁终于轮到出场了,的调用过程和完全一样,同样拿不到锁,然后加入到等待队列队尾然后,在阻塞前需要把前驱结点的状态置为,以确保将来可以被唤醒至此,的执行也暂告一段落了安心得在等待队列中睡觉。 showImg(https://segmentfault.com/img/remote/1460000016012467); 本文首发于一世流云的专栏:https://segmentfault...

    JayChen 评论0 收藏0
  • Java 重入锁 ReentrantLock 原理分析

    摘要:的主要功能和关键字一致,均是用于多线程的同步。而仅支持通过查询当前线程是否持有锁。由于和使用的是同一把可重入锁,所以线程可以进入方法,并再次获得锁,而不会被阻塞住。公平与非公平公平与非公平指的是线程获取锁的方式。 1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似。所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻...

    lx1036 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<