资讯专栏INFORMATION COLUMN

Java多线程进阶(十一)—— J.U.C之locks框架:StampedLock

libxd / 1633人阅读

摘要:的引入先来看下,为什么有了,还要引入使得多个读线程同时持有读锁只要写锁未被占用,而写锁是独占的。部分常量的比特位表示如下另外,相比,对多核进行了优化,可以看到,当核数超过时,会有一些自旋操作示例分析假设现在有三个线程。

</>复制代码

  1. 本文首发于一世流云的专栏:https://segmentfault.com/blog...
一、StampedLock类简介

StampedLock类,在JDK1.8时引入,是对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细粒度控制并发。

首先明确下,该类的设计初衷是作为一个内部工具类,用于辅助开发其它线程安全组件,用得好,该类可以提升系统性能,用不好,容易产生死锁和其它莫名其妙的问题。

1.1 StampedLock的引入

</>复制代码

  1. 先来看下,为什么有了ReentrantReadWriteLock,还要引入StampedLock?

ReentrantReadWriteLock使得多个读线程同时持有读锁(只要写锁未被占用),而写锁是独占的。

但是,读写锁如果使用不当,很容易产生“饥饿”问题:

比如在读线程非常多,写线程很少的情况下,很容易导致写线程“饥饿”,虽然使用“公平”策略可以一定程度上缓解这个问题,但是“公平”策略是以牺牲系统吞吐量为代价的。(在ReentrantLock类的介绍章节中,介绍过这种情况)

1.2 StampedLock的特点

StampedLock的主要特点概括一下,有以下几点:

所有获取锁的方法,都返回一个邮戳(Stamp),Stamp为0表示获取失败,其余都表示成功;

所有释放锁的方法,都需要一个邮戳(Stamp),这个Stamp必须是和成功获取锁时得到的Stamp一致;

StampedLock是不可重入的;(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)

StampedLock有三种访问模式:
①Reading(读模式):功能和ReentrantReadWriteLock的读锁类似
②Writing(写模式):功能和ReentrantReadWriteLock的写锁类似
③Optimistic reading(乐观读模式):这是一种优化的读模式。

StampedLock支持读锁和写锁的相互转换
我们知道RRW中,当线程获取到写锁后,可以降级为读锁,但是读锁是不能直接升级为写锁的。
StampedLock提供了读锁和写锁相互转换的功能,使得该类支持更多的应用场景。

无论写锁还是读锁,都不支持Conditon等待

</>复制代码

  1. 我们知道,在ReentrantReadWriteLock中,当读锁被使用时,如果有线程尝试获取写锁,该写线程会阻塞。
    但是,在Optimistic reading中,即使读线程获取到了读锁,写线程尝试获取写锁也不会阻塞,这相当于对读模式的优化,但是可能会导致数据不一致的问题。所以,当使用Optimistic reading获取到读锁时,必须对获取结果进行校验。
二、StampedLock使用示例

先来看一个Oracle官方的例子:

</>复制代码

  1. class Point {
  2. private double x, y;
  3. private final StampedLock sl = new StampedLock();
  4. void move(double deltaX, double deltaY) {
  5. long stamp = sl.writeLock(); //涉及对共享资源的修改,使用写锁-独占操作
  6. try {
  7. x += deltaX;
  8. y += deltaY;
  9. } finally {
  10. sl.unlockWrite(stamp);
  11. }
  12. }
  13. /**
  14. * 使用乐观读锁访问共享资源
  15. * 注意:乐观读锁在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,
  16. * 而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
  17. *
  18. * @return
  19. */
  20. double distanceFromOrigin() {
  21. long stamp = sl.tryOptimisticRead(); // 使用乐观读锁
  22. double currentX = x, currentY = y; // 拷贝共享资源到本地方法栈中
  23. if (!sl.validate(stamp)) { // 如果有写锁被占用,可能造成数据不一致,所以要切换到普通读锁模式
  24. stamp = sl.readLock();
  25. try {
  26. currentX = x;
  27. currentY = y;
  28. } finally {
  29. sl.unlockRead(stamp);
  30. }
  31. }
  32. return Math.sqrt(currentX * currentX + currentY * currentY);
  33. }
  34. void moveIfAtOrigin(double newX, double newY) { // upgrade
  35. // Could instead start with optimistic, not read mode
  36. long stamp = sl.readLock();
  37. try {
  38. while (x == 0.0 && y == 0.0) {
  39. long ws = sl.tryConvertToWriteLock(stamp); //读锁转换为写锁
  40. if (ws != 0L) {
  41. stamp = ws;
  42. x = newX;
  43. y = newY;
  44. break;
  45. } else {
  46. sl.unlockRead(stamp);
  47. stamp = sl.writeLock();
  48. }
  49. }
  50. } finally {
  51. sl.unlock(stamp);
  52. }
  53. }
  54. }

可以看到,上述示例最特殊的其实是distanceFromOrigin方法,这个方法中使用了“Optimistic reading”乐观读锁,使得读写可以并发执行,但是“Optimistic reading”的使用必须遵循以下模式:

</>复制代码

  1. long stamp = lock.tryOptimisticRead(); // 非阻塞获取版本信息
  2. copyVaraibale2ThreadMemory(); // 拷贝变量到线程本地堆栈
  3. if(!lock.validate(stamp)){ // 校验
  4. long stamp = lock.readLock(); // 获取读锁
  5. try {
  6. copyVaraibale2ThreadMemory(); // 拷贝变量到线程本地堆栈
  7. } finally {
  8. lock.unlock(stamp); // 释放悲观锁
  9. }
  10. }
  11. useThreadMemoryVarables(); // 使用线程本地堆栈里面的数据进行操作
三、StampedLock原理 3.1 StampedLock的内部常量

StampedLock虽然不像其它锁一样定义了内部类来实现AQS框架,但是StampedLock的基本实现思路还是利用CLH队列进行线程的管理,通过同步状态值来表示锁的状态和类型。

StampedLock内部定义了很多常量,定义这些常量的根本目的还是和ReentrantReadWriteLock一样,对同步状态值按位切分,以通过位运算对State进行操作:

</>复制代码

  1. 对于StampedLock来说,写锁被占用的标志是第8位为1,读锁使用0-7位,正常情况下读锁数目为1-126,超过126时,使用一个名为readerOverflowint整型保存超出数。

部分常量的比特位表示如下:

另外,StampedLock相比ReentrantReadWriteLock,对多核CPU进行了优化,可以看到,当CPU核数超过1时,会有一些自旋操作:

3.2 示例分析

</>复制代码

  1. 假设现在有三个线程:ThreadA、ThreadB、ThreadC、ThreadD。操作如下:
    //ThreadA调用writeLock, 获取写锁
    //ThreadB调用readLock, 获取读锁
    //ThreadC调用readLock, 获取读锁
    //ThreadD调用writeLock, 获取写锁
    //ThreadE调用readLock, 获取读锁
1. StampedLock对象的创建

StampedLock的构造器很简单,构造时设置下同步状态值:

另外,StamedLock提供了三类视图:

这些视图其实是对StamedLock方法的封装,便于习惯了ReentrantReadWriteLock的用户使用:
例如,ReadLockView其实相当于ReentrantReadWriteLock.readLock()返回的读锁;

2. ThreadA调用writeLock获取写锁

来看下writeLock方法:

StampedLock中大量运用了位运算,这里(s = state) & ABITS == 0L 表示读锁和写锁都未被使用,这里写锁可以立即获取成功,然后CAS操作更新同步状态值State。

操作完成后,等待队列的结构如下:

</>复制代码

  1. 注意:StampedLock中,等待队列的结点要比AQS中简单些,仅仅三种状态。
    0:初始状态
    -1:等待中
    1:取消

另外,结点的定义中有个cowait字段,该字段指向一个栈,用于保存读线程,这个后续会讲到。

3. ThreadB调用readLock获取读锁

来看下readLock方法:
由于ThreadA此时持有写锁,所以ThreadB获取读锁失败,将调用acquireRead方法,加入等待队列:

acquireRead方法非常复杂,用到了大量自旋操作:

</>复制代码

  1. /**
  2. * 尝试自旋的获取读锁, 获取不到则加入等待队列, 并阻塞线程
  3. *
  4. * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
  5. * @param deadline 如果非0, 则表示限时获取
  6. * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
  7. */
  8. private long acquireRead(boolean interruptible, long deadline) {
  9. WNode node = null, p; // node指向入队结点, p指向入队前的队尾结点
  10. /**
  11. * 自旋入队操作
  12. * 如果写锁未被占用, 则立即尝试获取读锁, 获取成功则返回.
  13. * 如果写锁被占用, 则将当前读线程包装成结点, 并插入等待队列(如果队尾是写结点,直接链接到队尾;否则,链接到队尾读结点的栈中)
  14. */
  15. for (int spins = -1; ; ) {
  16. WNode h;
  17. if ((h = whead) == (p = wtail)) { // 如果队列为空或只有头结点, 则会立即尝试获取读锁
  18. for (long m, s, ns; ; ) {
  19. if ((m = (s = state) & ABITS) < RFULL ? // 判断写锁是否被占用
  20. U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //写锁未占用,且读锁数量未超限, 则更新同步状态
  21. (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
  22. return ns; // 获取成功后, 直接返回
  23. else if (m >= WBIT) { // 写锁被占用,以随机方式探测是否要退出自旋
  24. if (spins > 0) {
  25. if (LockSupport.nextSecondarySeed() >= 0)
  26. --spins;
  27. } else {
  28. if (spins == 0) {
  29. WNode nh = whead, np = wtail;
  30. if ((nh == h && np == p) || (h = nh) != (p = np))
  31. break;
  32. }
  33. spins = SPINS;
  34. }
  35. }
  36. }
  37. }
  38. if (p == null) { // p == null表示队列为空, 则初始化队列(构造头结点)
  39. WNode hd = new WNode(WMODE, null);
  40. if (U.compareAndSwapObject(this, WHEAD, null, hd))
  41. wtail = hd;
  42. } else if (node == null) { // 将当前线程包装成读结点
  43. node = new WNode(RMODE, p);
  44. } else if (h == p || p.mode != RMODE) { // 如果队列只有一个头结点, 或队尾结点不是读结点, 则直接将结点链接到队尾, 链接完成后退出自旋
  45. if (node.prev != p)
  46. node.prev = p;
  47. else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
  48. p.next = node;
  49. break;
  50. }
  51. }
  52. // 队列不为空, 且队尾是读结点, 则将添加当前结点链接到队尾结点的cowait链中(实际上构成一个栈, p是栈顶指针 )
  53. else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) { // CAS操作队尾结点p的cowait字段,实际上就是头插法插入结点
  54. node.cowait = null;
  55. } else {
  56. for (; ; ) {
  57. WNode pp, c;
  58. Thread w;
  59. // 尝试唤醒头结点的cowait中的第一个元素, 假如是读锁会通过循环释放cowait链
  60. if ((h = whead) != null && (c = h.cowait) != null &&
  61. U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
  62. (w = c.thread) != null) // help release
  63. U.unpark(w);
  64. if (h == (pp = p.prev) || h == p || pp == null) {
  65. long m, s, ns;
  66. do {
  67. if ((m = (s = state) & ABITS) < RFULL ?
  68. U.compareAndSwapLong(this, STATE, s,
  69. ns = s + RUNIT) :
  70. (m < WBIT &&
  71. (ns = tryIncReaderOverflow(s)) != 0L))
  72. return ns;
  73. } while (m < WBIT);
  74. }
  75. if (whead == h && p.prev == pp) {
  76. long time;
  77. if (pp == null || h == p || p.status > 0) {
  78. node = null; // throw away
  79. break;
  80. }
  81. if (deadline == 0L)
  82. time = 0L;
  83. else if ((time = deadline - System.nanoTime()) <= 0L)
  84. return cancelWaiter(node, p, false);
  85. Thread wt = Thread.currentThread();
  86. U.putObject(wt, PARKBLOCKER, this);
  87. node.thread = wt;
  88. if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {
  89. // 写锁被占用, 且当前结点不是队首结点, 则阻塞当前线程
  90. U.park(false, time);
  91. }
  92. node.thread = null;
  93. U.putObject(wt, PARKBLOCKER, null);
  94. if (interruptible && Thread.interrupted())
  95. return cancelWaiter(node, p, true);
  96. }
  97. }
  98. }
  99. }
  100. for (int spins = -1; ; ) {
  101. WNode h, np, pp;
  102. int ps;
  103. if ((h = whead) == p) { // 如果当前线程是队首结点, 则尝试获取读锁
  104. if (spins < 0)
  105. spins = HEAD_SPINS;
  106. else if (spins < MAX_HEAD_SPINS)
  107. spins <<= 1;
  108. for (int k = spins; ; ) { // spin at head
  109. long m, s, ns;
  110. if ((m = (s = state) & ABITS) < RFULL ? // 判断写锁是否被占用
  111. U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //写锁未占用,且读锁数量未超限, 则更新同步状态
  112. (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中
  113. // 获取读锁成功, 释放cowait链中的所有读结点
  114. WNode c;
  115. Thread w;
  116. // 释放头结点, 当前队首结点成为新的头结点
  117. whead = node;
  118. node.prev = null;
  119. // 从栈顶开始(node.cowait指向的结点), 依次唤醒所有读结点, 最终node.cowait==null, node成为新的头结点
  120. while ((c = node.cowait) != null) {
  121. if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
  122. U.unpark(w);
  123. }
  124. return ns;
  125. } else if (m >= WBIT &&
  126. LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
  127. break;
  128. }
  129. } else if (h != null) { // 如果头结点存在cowait链, 则唤醒链中所有读线程
  130. WNode c;
  131. Thread w;
  132. while ((c = h.cowait) != null) {
  133. if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
  134. (w = c.thread) != null)
  135. U.unpark(w);
  136. }
  137. }
  138. if (whead == h) {
  139. if ((np = node.prev) != p) {
  140. if (np != null)
  141. (p = np).next = node; // stale
  142. } else if ((ps = p.status) == 0) // 将前驱结点的等待状态置为WAITING, 表示之后将唤醒当前结点
  143. U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
  144. else if (ps == CANCELLED) {
  145. if ((pp = p.prev) != null) {
  146. node.prev = pp;
  147. pp.next = node;
  148. }
  149. } else { // 阻塞当前读线程
  150. long time;
  151. if (deadline == 0L)
  152. time = 0L;
  153. else if ((time = deadline - System.nanoTime()) <= 0L) //限时等待超时, 取消等待
  154. return cancelWaiter(node, node, false);
  155. Thread wt = Thread.currentThread();
  156. U.putObject(wt, PARKBLOCKER, this);
  157. node.thread = wt;
  158. if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {
  159. // 如果前驱的等待状态为WAITING, 且写锁被占用, 则阻塞当前调用线程
  160. U.park(false, time);
  161. }
  162. node.thread = null;
  163. U.putObject(wt, PARKBLOCKER, null);
  164. if (interruptible && Thread.interrupted())
  165. return cancelWaiter(node, node, true);
  166. }
  167. }
  168. }
  169. }

我们来分析下这个方法。
该方法会首先自旋的尝试获取读锁,获取成功后,就直接返回;否则,会将当前线程包装成一个读结点,插入到等待队列。
由于,目前等待队列还是空,所以ThreadB会初始化队列,然后将自身包装成一个读结点,插入队尾,然后在下面这个地方跳出自旋:

此时,等待队列的结构如下:

跳出自旋后,ThreadB会继续向下执行,进入下一个自旋,在下一个自旋中,依然会再次尝试获取读锁,如果这次再获取不到,就会将前驱的等待状态置为WAITING, 表示我(当前线程)要去睡了(阻塞),到时记得叫醒我:

最终, ThreadB进入阻塞状态:

最终,等待队列的结构如下:

4. ThreadC调用readLock获取读锁

这个过程和ThreadB获取读锁一样,区别在于ThreadC被包装成结点加入等待队列后,是链接到ThreadB结点的栈指针中的。调用完下面这段代码后,ThreadC会链接到以Thread B为栈顶指针的栈中:

</>复制代码

  1. 注意:读结点的cowait字段其实构成了一个栈,入栈的过程其实是个“头插法”插入单链表的过程。比如,再来个ThreadX读结点,则cowait链表结构为:ThreadB - > ThreadX -> ThreadC。最终唤醒读结点时,将从栈顶开始。

然后会在下一次自旋中,阻塞当前读线程:

最终,等待队列的结构如下:

可以看到,此时ThreadC结点并没有把它的前驱的等待状态置为-1,因为ThreadC是链接到栈中的,当写锁释放的时候,会从栈底元素开始,唤醒栈中所有读结点。

5. ThreadD调用writeLock获取写锁

ThreadD调用writeLock方法获取写锁失败后(ThreadA依然占用着写锁),会调用acquireWrite方法,该方法整体逻辑和acquireRead差不多,首先自旋的尝试获取写锁,获取成功后,就直接返回;否则,会将当前线程包装成一个写结点,插入到等待队列。

acquireWrite源码:

</>复制代码

  1. /**
  2. * 尝试自旋的获取写锁, 获取不到则阻塞线程
  3. *
  4. * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED
  5. * @param deadline 如果非0, 则表示限时获取
  6. * @return 非0表示获取成功, INTERRUPTED表示中途被中断过
  7. */
  8. private long acquireWrite(boolean interruptible, long deadline) {
  9. WNode node = null, p;
  10. /**
  11. * 自旋入队操作
  12. * 如果没有任何锁被占用, 则立即尝试获取写锁, 获取成功则返回.
  13. * 如果存在锁被使用, 则将当前线程包装成独占结点, 并插入等待队列尾部
  14. */
  15. for (int spins = -1; ; ) {
  16. long m, s, ns;
  17. if ((m = (s = state) & ABITS) == 0L) { // 没有任何锁被占用
  18. if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) // 尝试立即获取写锁
  19. return ns; // 获取成功直接返回
  20. } else if (spins < 0)
  21. spins = (m == WBIT && wtail == whead) ? SPINS : 0;
  22. else if (spins > 0) {
  23. if (LockSupport.nextSecondarySeed() >= 0)
  24. --spins;
  25. } else if ((p = wtail) == null) { // 队列为空, 则初始化队列, 构造队列的头结点
  26. WNode hd = new WNode(WMODE, null);
  27. if (U.compareAndSwapObject(this, WHEAD, null, hd))
  28. wtail = hd;
  29. } else if (node == null) // 将当前线程包装成写结点
  30. node = new WNode(WMODE, p);
  31. else if (node.prev != p)
  32. node.prev = p;
  33. else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // 链接结点至队尾
  34. p.next = node;
  35. break;
  36. }
  37. }
  38. for (int spins = -1; ; ) {
  39. WNode h, np, pp;
  40. int ps;
  41. if ((h = whead) == p) { // 如果当前结点是队首结点, 则立即尝试获取写锁
  42. if (spins < 0)
  43. spins = HEAD_SPINS;
  44. else if (spins < MAX_HEAD_SPINS)
  45. spins <<= 1;
  46. for (int k = spins; ; ) { // spin at head
  47. long s, ns;
  48. if (((s = state) & ABITS) == 0L) { // 写锁未被占用
  49. if (U.compareAndSwapLong(this, STATE, s,
  50. ns = s + WBIT)) { // CAS修改State: 占用写锁
  51. // 将队首结点从队列移除
  52. whead = node;
  53. node.prev = null;
  54. return ns;
  55. }
  56. } else if (LockSupport.nextSecondarySeed() >= 0 &&
  57. --k <= 0)
  58. break;
  59. }
  60. } else if (h != null) { // 唤醒头结点的栈中的所有读线程
  61. WNode c;
  62. Thread w;
  63. while ((c = h.cowait) != null) {
  64. if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
  65. U.unpark(w);
  66. }
  67. }
  68. if (whead == h) {
  69. if ((np = node.prev) != p) {
  70. if (np != null)
  71. (p = np).next = node; // stale
  72. } else if ((ps = p.status) == 0) // 将当前结点的前驱置为WAITING, 表示当前结点会进入阻塞, 前驱将来需要唤醒我
  73. U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
  74. else if (ps == CANCELLED) {
  75. if ((pp = p.prev) != null) {
  76. node.prev = pp;
  77. pp.next = node;
  78. }
  79. } else { // 阻塞当前调用线程
  80. long time; // 0 argument to park means no timeout
  81. if (deadline == 0L)
  82. time = 0L;
  83. else if ((time = deadline - System.nanoTime()) <= 0L)
  84. return cancelWaiter(node, node, false);
  85. Thread wt = Thread.currentThread();
  86. U.putObject(wt, PARKBLOCKER, this);
  87. node.thread = wt;
  88. if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
  89. U.park(false, time); // emulate LockSupport.park
  90. node.thread = null;
  91. U.putObject(wt, PARKBLOCKER, null);
  92. if (interruptible && Thread.interrupted())
  93. return cancelWaiter(node, node, true);
  94. }
  95. }
  96. }
  97. }

acquireWrite中的下面这个自旋操作,用于将线程包装成写结点,插入队尾:

插入完成后,队列结构如下:

然后,进入下一个自旋,并在下一个自旋中阻塞ThreadD,最终队列结构如下:

6. ThreadE调用readLock获取读锁

同样,由于写锁被ThreadA占用着,所以最终会调用acquireRead方法,在该方法的第一个自旋中,会将ThreadE加入等待队列:

</>复制代码

  1. 注意,由于队尾结点是写结点,所以当前读结点会直接链接到队尾;如果队尾是读结点,则会链接到队尾读结点的cowait链中。

然后进入第二个自旋,阻塞ThreadE,最终队列结构如下:

7. ThreadA调用unlockWrite释放写锁

通过CAS操作,修改State成功后,会调用release方法唤醒等待队列的队首结点:

release方法非常简单,先将头结点的等待状态置为0,表示即将唤醒后继结点,然后立即唤醒队首结点:

此时,等待队列的结构如下:

8. ThreadB被唤醒后继续向下执行

ThreadB被唤醒后,会从原阻塞处继续向下执行,然后开始下一次自旋:

第二次自旋时,ThreadB发现写锁未被占用,则成功获取到读锁,然后从栈顶(ThreadB的cowait指针指向的结点)开始唤醒栈中所有线程,
最后返回:

最终,等待队列的结构如下:

9. ThreadC被唤醒后继续向下执行

ThreadC被唤醒后,继续执行,并进入下一次自旋,下一次自旋时,会成功获取到读锁。

注意,此时ThreadB和ThreadC已经拿到了读锁,ThreadD(写线程)和ThreadE(读线程)依然阻塞中,原来ThreadC对应的结点是个孤立结点,会被GC回收。

最终,等待队列的结构如下:

10. ThreadB和ThreadC释放读锁

ThreadB和ThreadC调用unlockRead方法释放读锁,CAS操作State将读锁数量减1:

注意,当读锁的数量变为0时才会调用release方法,唤醒队首结点:

队首结点(ThreadD写结点被唤醒),最终等待队列的结构如下:

11. ThreadD被唤醒后继续向下执行

ThreadD会从原阻塞处继续向下执行,并在下一次自旋中获取到写锁,然后返回:

最终,等待队列的结构如下:

12. ThreadD调用unlockWrite释放写锁

ThreadD释放写锁的过程和步骤7完全相同,会调用unlockWrite唤醒队首结点(ThreadE)。

ThreadE被唤醒后会从原阻塞处继续向下执行,但由于ThreadE是个读结点,所以同时会唤醒cowait栈中的所有读结点,过程和步骤8完全一样。最终,等待队列的结构如下:

至此,全部执行完成。

四、StampedLock类/方法声明

参考Oracle官方文档:https://docs.oracle.com/javas...
类声明:

方法声明:

五、StampedLock总结

StampedLock的等待队列与RRW的CLH队列相比,有以下特点:

当入队一个线程时,如果队尾是读结点,不会直接链接到队尾,而是链接到该读结点的cowait链中,cowait链本质是一个栈;

当入队一个线程时,如果队尾是写结点,则直接链接到队尾;

唤醒线程的规则和AQS类似,都是首先唤醒队首结点。区别是StampedLock中,当唤醒的结点是读结点时,会唤醒该读结点的cowait链中的所有读结点(顺序和入栈顺序相反,也就是后进先出)。

另外,StampedLock使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76548.html

相关文章

  • Java线程进阶(一)—— J.U.C并发包概述

    摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...

    anonymoussf 评论0 收藏0
  • Java线程进阶(三)—— J.U.Clocks框架:ReentrantLock

    摘要:公平策略在多个线程争用锁的情况下,公平策略倾向于将访问权授予等待时间最长的线程。使用方式的典型调用方式如下二类原理的源码非常简单,它通过内部类实现了框架,接口的实现仅仅是对的的简单封装,参见原理多线程进阶七锁框架独占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首发于一世流云的专栏:https...

    jasperyang 评论0 收藏0
  • Java线程进阶(八)—— J.U.Clocks框架:AQS的Conditon等待(3)

    摘要:关于接口的介绍,可以参见多线程进阶二锁框架接口。最终线程释放了锁,并进入阻塞状态。当线程被通知唤醒时,则是将条件队列中的结点转换成等待队列中的结点,之后的处理就和独占功能完全一样。 showImg(https://segmentfault.com/img/remote/1460000016012490); 本文首发于一世流云的专栏:https://segmentfault.com/bl...

    ityouknow 评论0 收藏0
  • Java线程进阶(二)—— J.U.Clocks框架:接口

    摘要:二接口简介可以看做是类的方法的替代品,与配合使用。当线程执行对象的方法时,当前线程会立即释放锁,并进入对象的等待区,等待其它线程唤醒或中断。 showImg(https://segmentfault.com/img/remote/1460000016012601); 本文首发于一世流云的专栏:https://segmentfault.com/blog... 本系列文章中所说的juc-...

    dkzwm 评论0 收藏0
  • Java线程进阶(四)—— J.U.Clocks框架:ReentrantReadWriteLoc

    摘要:我们知道,的作用其实是对类的和的增强,是为了让线程在指定对象上等待,是一种线程之间进行协调的工具。当线程调用对象的方法时,必须拿到和这个对象关联的锁。 showImg(https://segmentfault.com/img/remote/1460000016012566); 本文首发于一世流云的专栏:https://segmentfault.com/blog... 一、Reentr...

    kumfo 评论0 收藏0

发表评论

0条评论

libxd

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<