资讯专栏INFORMATION COLUMN

Java多线程之自旋锁与队列锁

Sunxb / 1978人阅读

摘要:队列锁就是将线程组织成一个队列,让每个线程在不同的存储单元上旋转,从而降低一致性流量。队列锁队列锁表示为对象的链表,每个线程通过一个线程局部变量指向其前驱。

编写高效的并发程序,需要对互斥问题重新研究,设计出适用于多线程的互斥协议。那么问题来了,如果不能获得锁,应该怎么做?

旋转:继续进行尝试,如自旋锁,延迟较短;

阻塞:挂起自己,请求调度器切换到另一个线程,代价较大。

综合来看,先旋转一小段时间再阻塞,是种不错的选择。

java.util.concurrent.locks.Lock接口提供了lock()unlock()两个重要的方法,用于解决实际互斥问题。

</>复制代码

  1. Lock mutex = new MyLock();
  2. mutex.lock();
  3. try {
  4. do something
  5. } finally {
  6. mutex.unlock();
  7. }
测试-设置锁

测试-设置来源于getAndSet()操作,通过一个原子布尔型状态变量的值判断当前锁的状态。若为true表示锁忙,若为false表示锁空闲。

</>复制代码

  1. public class TASLock {
  2. AtomicBoolean state = new AtomicBoolean(false);
  3. public void lock() {
  4. while (state.getAndSet(true)) {
  5. ;
  6. }
  7. }
  8. public void unlock() {
  9. state.set(false);
  10. }
  11. }
测试-测试-设置锁

TTASLock是升级版的TASLock算法,没有直接调用getAndSet()方法,而是在锁看起来空闲(state.get()返回false)时才调用。

</>复制代码

  1. public class TTASLock {
  2. AtomicBoolean state = new AtomicBoolean(false);
  3. public void lock() {
  4. while (true) {
  5. while (state.get()) {
  6. ;
  7. }
  8. if (! state.getAndSet(true)) {
  9. return;
  10. }
  11. }
  12. }
  13. public void unlock() {
  14. state.set(false);
  15. }
  16. }

这两个算法都能保证无死锁的互斥,但是TTASLock的性能会比TASLock高许多。

可以从计算机系统结构的高速缓存和局部性来解释这个问题,每个处理器都有一个cache,cache的访问速度比内存快好几个数量级。当cache命中时,会立即加载这个值;当cache缺失时,会在内存或两一个处理器的cache中寻找这个数据。寻找的过程比较漫长,处理器在总线上广播这个地址,其他处理器监听总线。若其他处理器在自己的cache中发现这个地址,则广播该地址及其值来做出响应。若所有处理器都没发现这个地址,则以内存地址及其所对应的值进行响应。

getAndSet()的直接调用让TASLock性能损失许多:

getAndSet()的调用实质是总线上的一个广播,这个调用将会延迟所有的线程,因为所有线程都要通过监听总线通信。

getAndSet()的调用会更新state的值,即使值仍为true,但是其他处理器cache中的锁副本将会被丢弃,从而导致cache缺失。

当持有锁的线程试图释放锁时可能被延迟,因为总线被正在自旋的线程独占。

与此相反,对于TTASLock算法采用的是本地旋转(线程反复地重读被缓存的值而不是反复地使用总线),线程A持有锁时,线程B尝试获得锁,但线程B只会在第一次读锁是cache缺失,之后每次cache命中不产生总线流量。

那么缺点来了,TTASLock释放锁时,会使各自旋线程处理器中的cache副本立即失效,这些线程会重新读取这个值,造成总线流量风暴。

指数后退锁

对于TTASLock算法,当锁看似空闲(state.get()返回false)时,存在高争用(多个线程试图同时获取一个锁)的可能。高争用意味着获取锁的可能性小,并且会造成总线流量增加。线程在重试之前回退一段时间是种不错的选择。

这里实现的指数回退算法的回退准则是,不成功尝试的次数越多,发生争用的可能性就越高,线程需要后退的时间就应越长。

</>复制代码

  1. public class BackoffLock {
  2. private AtomicBoolean state = new AtomicBoolean(false);
  3. private static final int MIN_DELAY = 10;
  4. private static final int MAX_DELAY = 100;
  5. public void lock() {
  6. Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
  7. while (true) {
  8. while (state.get()) {
  9. ;
  10. }
  11. if (! state.getAndSet(true)) {
  12. return;
  13. } else {
  14. backoff.backoff();
  15. }
  16. }
  17. }
  18. public void unlock() {
  19. state.set(false);
  20. }
  21. }
  22. class Backoff {
  23. private final int minDelay, maxDelay;
  24. int limit;
  25. final Random random;
  26. public Backoff(int min, int max) {
  27. minDelay = min;
  28. maxDelay = max;
  29. limit = minDelay;
  30. random = new Random();
  31. }
  32. public void backoff() {
  33. int delay = random.nextInt(limit);
  34. limit = Math.min(maxDelay, 2 * limit);
  35. try {
  36. Thread.sleep(delay);
  37. } catch (InterruptedException e) {
  38. ;
  39. }
  40. }
  41. }

指数后退算法解决了TTASLock释放锁时的高争用问题,但是它的性能与minDelaymaxDelay的选取密切相关,并且很难找到一个通用兼容的值。

另外,BackoffLock算法还有两个问题:

cache一致性流量:所有线程都在同一个共享存储单元上旋转;

临界区利用率低:后退时间无法确定,线程延迟可能过长。

基于数组的锁

下面的这些是队列锁,名字看上去奇形怪状的,其实是发明者名字的首字母。队列锁就是将线程组织成一个队列,让每个线程在不同的存储单元上旋转,从而降低cache一致性流量。

基于循环数组实现队列锁ALock,每个线程检测自己的slot对应的flag[]域来判断是否轮到自己。

一个线程想获得锁,就要调用lock()方法,获得自增tail获得分配的slot号,然后等待这个slot空闲;当释放锁时,就要阻塞当前slot,然后让下一个slot可运行。
flag[i]true时,那么这个线程就有权获得锁。任意时刻的flag[]数组中,应该只有一个slot的值为true

</>复制代码

  1. public class ALock {
  2. ThreadLocal mySlotIndex = new ThreadLocal();
  3. AtomicInteger tail;
  4. volatile boolean [] flag;
  5. int size;
  6. public ALock() {
  7. size = 100;
  8. tail = new AtomicInteger(0);
  9. flag = new boolean[size];
  10. flag[0] = true;
  11. }
  12. public void lock() {
  13. int slot = tail.getAndIncrement() % size;
  14. mySlotIndex.set(slot);
  15. while (! flag[slot]) {
  16. ;
  17. }
  18. }
  19. public void unlock() {
  20. int slot = mySlotIndex.get();
  21. flag[slot] = false;
  22. flag[(slot + 1) % size] = true;
  23. }
  24. }

mySlotIndex是线程的局部变量,只能被一个线程访问,每个线程都有自己独立初始化的副本。不需要保存在共享存储器,不需要同步,不会产生一致性流量。使用get()set()方法来访问局部变量的值。

tail是常规变量,域被所有的线程共享,支持原子操作。

数组flag[]也是被多个线程共享的,但是每个线程都是在一个数组元素的本地cache副本上旋转。

ALock对BackoffLock的改进:在多个共享存储单元上旋转,将cache无效性降到最低;把一个线程释放锁和另一个线程获得该锁之间的时间间隔最小化;先来先服务的公平性。但是,数组的大小至少与最大的并发线程数相同,并不是空间有效的,当并发线程最大个数为n时,同步L个不同对象就需要O(Ln)大小的空间。

CLH队列锁

CLH队列锁表示为QNode对象的链表,每个线程通过一个线程局部变量pred指向其前驱。每个线程通过检测前驱结点的locked域来判断是否轮到自己。如果该域为true,则前驱线程要么已经获得锁要么正在等待锁;如果该域为false,则前驱进程已释放锁,轮到自己了。正常情况下,队列链中只有一个结点的locked域为false

当一个线程调用lock()方法想获得锁时,将自己的locked域置为true,表示该线程不准备释放锁,然后并将自己的结点加入到队列链尾部。最后就是在前驱的locked域上旋转,等待前驱释放锁。当这个线程调用unlock()方法要释放锁时,线程要将自己的locked域置为false,表示已经释放锁,然后将前驱结点作为自己的新结点以便日后访问。

那么问题来了,为什么要在释放锁时做myNode.set(myPred.get())这个处理呢?假设线程A释放锁,A的后继结点为B,如果不使用这种处理方式,A在释放锁后马上申请锁将自己的locked域置为true,整个动作在B检测到前驱A释放锁之前,那么将发生死锁。

</>复制代码

  1. public class CLHLock {
  2. AtomicReference tail;
  3. ThreadLocal myPred;
  4. ThreadLocal myNode;
  5. public CLHLock() {
  6. tail = new AtomicReference(new QNode());
  7. myPred = new ThreadLocal() {
  8. protected QNode initialValue() {
  9. return null;
  10. }
  11. };
  12. myNode = new ThreadLocal() {
  13. protected QNode initialValue() {
  14. return new QNode();
  15. }
  16. };
  17. }
  18. public void lock() {
  19. QNode qnode = myNode.get();
  20. qnode.locked = true;
  21. QNode pred = tail.getAndSet(qnode);
  22. myPred.set(pred);
  23. while (pred.locked) {
  24. ;
  25. }
  26. }
  27. public void unlock() {
  28. QNode qnode = myNode.get();
  29. qnode.locked = false;
  30. myNode.set(myPred.get());
  31. }
  32. class QNode {
  33. boolean locked = false;
  34. }
  35. }

如果最大线程数为n,有L个不同对象,那么CLHLock只需要O(L+n)空间。比ALock所需空间少,并且不需要知道可能使用锁的最大线程数量。但是,在无cache的系统上性能较差,因为一次要访问两个结点,若这两个结点内存位置较远,性能损失会很大。

MCS队列锁

MCS队列锁通过检测自己所在结点的locked的值来判断是否轮到自己,等待这个域被前驱释放锁时改变。

线程若要获得锁,需把自己结点添加到链表的尾部。若队列链表原先为空,则获得锁。否则,将前驱结点的next域指向自己,在自己的locked域上自旋等待,直到前驱将该域置为false。线程若要释放锁,判断是否在队尾,如果是只需将tail置为null,如果不是需将后继的locked域置为false且将自己结点的next域置为默认的null。注意在队尾的情况,可能有个线程正在获得锁,要等一下变为后一种情况。

</>复制代码

  1. public class MCSLock {
  2. AtomicReference tail;
  3. ThreadLocal myNode;
  4. public MCSLock() {
  5. tail = new AtomicReference(null);
  6. myNode = new ThreadLocal() {
  7. protected QNode initialValue() {
  8. return new QNode();
  9. }
  10. };
  11. }
  12. public void lock() {
  13. QNode qnode = myNode.get();
  14. QNode pred = tail.getAndSet(qnode);
  15. if (pred != null) {
  16. qnode.locked = true;
  17. pred.next = qnode;
  18. while (qnode.locked) {
  19. ;
  20. }
  21. }
  22. }
  23. public void unlock() {
  24. QNode qnode = myNode.get();
  25. if (qnode.next == null) {
  26. if (tail.compareAndSet(qnode, null)) {
  27. return;
  28. }
  29. while (qnode.next == null) {
  30. ;
  31. }
  32. }
  33. qnode.next.locked = false;
  34. qnode.next = null;
  35. }
  36. class QNode {
  37. boolean locked = false;
  38. QNode next = null;
  39. }
  40. }

结点能被重复使用,该锁的空间复杂度为O(L+n)。MCSLock算法适合无cache的系统结构,因为每个线程只需控制自己自旋的存储单元。但是,释放锁也需要旋转,另外读写比较次数比CLHLock多。

时限队列锁

Lock接口中有个一个tryLock()方法,可以指定一个时限(获得锁可等待的最大时间),若获得锁超时则放弃尝试。最后返回一个布尔值说明锁是否申请成功。

时限队列锁TOLock是基于CLHLock类的,锁是一个结点的虚拟队列,每个结点在它的前驱结点上自旋等待锁释放。但是当这个线程超时时,不能简单的抛弃它的队列结点,而是将这个结点标记为已废弃,这样它的后继们(如果有)就会注意到这个结点已放弃,转而在放弃结点的前驱上自旋。为了保证队列链表的连续性,每次申请锁都会new一个QNode

时限队列锁结点的域pred会特殊一点,它有3种类型的取值:

null:初始状态,未获得锁或已释放锁;

AVAILABLE:一个静态结点,表示对应结点已释放锁,申请锁成功;

QNodepred域为null的前驱结点,表示对应结点因超时放弃锁请求,在放弃请求时才会设置这个值。

申请锁时,创建一个pred域为null的新结点并加入到链表尾部,若原先链表为空或前驱结点已释放锁,则获得锁。否则,在尝试时间内,找到pred域为null的前驱结点,等待它释放锁。若在等待前驱结点释放锁的过程中超时,就尝试从链表中删除这个结点,要分这个结点是否有后继两种情况。
释放锁时,检查该结点是否有后继,若无后继可直接把tail设置为null,否则将该结点的pred域指向AVAILABLE

</>复制代码

  1. public class TOLock {
  2. static QNode AVAILABLE = new QNode();
  3. AtomicReference tail;
  4. ThreadLocal myNode;
  5. public TOLock() {
  6. tail = new AtomicReference(null);
  7. myNode = new ThreadLocal();
  8. }
  9. public boolean tryLock(long time, TimeUnit unit) {
  10. long startTime = System.currentTimeMillis();
  11. long patience = TimeUnit.MILLISECONDS.convert(time, unit);
  12. QNode qnode = new QNode();
  13. myNode.set(qnode);
  14. qnode.pred = null;
  15. QNode myPred = tail.getAndSet(qnode);
  16. if (myPred == null || myPred.pred == AVAILABLE) {
  17. return true;
  18. }
  19. while (System.currentTimeMillis() - startTime < patience) {
  20. QNode predPred = myPred.pred;
  21. if (predPred == AVAILABLE) {
  22. return true;
  23. } else {
  24. if (predPred != null) {
  25. myPred = predPred;
  26. }
  27. }
  28. }
  29. if (! tail.compareAndSet(qnode, myPred)) {
  30. qnode.pred = myPred;
  31. }
  32. return false;
  33. }
  34. public void unlock() {
  35. QNode qnode = myNode.get();
  36. if (! tail.compareAndSet(qnode, null)) {
  37. qnode.pred = AVAILABLE;
  38. }
  39. }
  40. static class QNode {
  41. public QNode pred = null;
  42. }
  43. }

优点:同CLHLock。
缺点:每次申请锁都要分配一个新结点,在锁上旋转的线程可能要回溯一个超时结点链。

上面实现的这些锁算法不支持重入。我们可以使用银行转账的例子来测试一下锁的效果,任意账户间可以随意转账,锁生效时所有账户的总金额是不变的。完整的算法实现和测试代码在这里。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/65777.html

相关文章

  • Java中的以及sychronized实现机制

    摘要:有可能,会造成优先级反转或者饥饿现象。悲观锁在中的使用,就是利用各种锁。对于而言,其是独享锁。偏向锁,顾名思义,它会偏向于第一个访问锁的线程,大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得。 理解锁的基础知识 如果想要透彻的理解java锁的来龙去脉,需要先了解以下基础知识。 基础知识之一:锁的类型 按照其性质分类 公平锁/非公平锁 公平锁是指多个线程按照申请锁的顺序来获...

    linkin 评论0 收藏0
  • Java Monitor(管程)

    摘要:当前线程使用将对象头的替换为锁记录指针,如果成功,当前线程获得锁如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。重量级锁是悲观锁的一种,自旋锁轻量级锁与偏向锁属于乐观锁。 操作系统在面对线程间同步的时候,会支持例如semaphore信号量和mutex互斥量等同步原语,而monitor是在编程语言中被实现的,下面介绍一下java中monitor(监视器/管程:管理共享变量以...

    caspar 评论0 收藏0
  • 不可不说的Java”事

    摘要:本文旨在对锁相关源码本文中的源码来自使用场景进行举例,为读者介绍主流锁的知识点,以及不同的锁的适用场景。中,关键字和的实现类都是悲观锁。自适应意味着自旋的时间次数不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。 前言 Java提供了种类丰富的锁,每种锁因其特性的不同,在适当的场景下能够展现出非常高的效率。本文旨在对锁相关源码(本文中的源码来自JDK 8)、使用场景...

    galaxy_robot 评论0 收藏0
  • Java 中15种的介绍:公平,可重入,独享,互斥,乐观,分段自旋等等

    摘要:公平锁非公平锁公平锁公平锁是指多个线程按照申请锁的顺序来获取锁。加锁后,任何其他试图再次加锁的线程会被阻塞,直到当前进程解锁。重量级锁会让其他申请的线程进入阻塞,性能降低。 Java 中15种锁的介绍 在读很多并发文章中,会提及各种各样锁如公平锁,乐观锁等等,这篇文章介绍各种锁的分类。介绍的内容如下: 公平锁 / 非公平锁 可重入锁 / 不可重入锁 独享锁 / 共享锁 互斥锁 / 读...

    LeoHsiun 评论0 收藏0
  • Java线程——带你看AQS框架源码

    摘要:作用是存储获取锁失败的阻塞线程。独占模式下,锁是线程独占的,而共享模式下,锁是可以被多个线程占用的。等方法就是让线程阻塞加入队列唤醒线程等。该方法其实就是自旋尝试获取锁或阻塞线程子类实现决定。 AQS,全称AbstractQueuedSynchronizer,是Concurrent包锁的核心,没有AQS就没有Java的Concurrent包。它到底是个什么,我们来看看源码的第一段注解是...

    stackvoid 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<