资讯专栏INFORMATION COLUMN

ReentrantLock 类

mykurisu / 993人阅读

摘要:在多线程中可以使用关键字来实现多线程之间同步互斥但在中新增加了类也能达到同样的效果并且在扩展功能上也更加强大比如具有嗅探锁定多路分支通知公平锁和非公平锁等默认功能而且在使用上也比更加的灵活使用实现同步调用对象的方法获取锁调用方法释放锁从运行

在 Java 多线程中, 可以使用 synchronized 关键字来实现多线程之间同步互斥, 但在 JDK 1.5 中新增加了 ReentrantLock 类也能达到同样的效果, 并且在扩展功能上也更加强大, 比如具有嗅探锁定, 多路分支通知, 公平锁和非公平锁等(默认)功能, 而且在使用上也比 synchronized 更加的灵活.

使用 ReentrantLock 实现同步
public class MyService {

    private Lock lock = new ReentrantLock();

    public void testMethod() {
        lock.lock();

        for (int i = 0; i < 10; i++){
            System.out.println("ThreadName=" + Thread.currentThread().getName() + (" " + (i + 1)));
        }

        lock.unlock();

    }

}
public class MyThread extends Thread {

    private MyService myService;
    public MyThread(MyService myService) {
        this.myService = myService;
    }

    @Override
    public void run() {
        myService.testMethod();
    }
}
    public static void main(String[] args) throws IOException, InterruptedException {

        MyService myService = new MyService();

        MyThread myThreadA = new MyThread(myService);
        MyThread myThreadB = new MyThread(myService);
        MyThread myThreadC = new MyThread(myService);
        MyThread myThreadD = new MyThread(myService);
        MyThread myThreadE = new MyThread(myService);

        myThreadA.start();
        myThreadB.start();
        myThreadC.start();
        myThreadD.start();
        myThreadE.start();

    }

调用 ReentrantLock 对象的 lock() 方法获取锁, 调用 unLock() 方法释放锁.

从运行结果来看, 当前线程打印完毕之后将锁进行释放, 其他的线程才可以继续打印. 线程打印的数据是分组打印, 因为当前线程已经持有锁, 但线程之间打印的顺序是随机的.

使用 Condition 实现等待/通知

关键字 synchronizedwait()notify() / notifyall() 方法结合可以实现等待/通知模式, 只不过在使用时, 调用 notify() 方法 JVM 会随机选择一个 WAITNG 状态的线程来执行.

而使用 Condition 则可以更加灵活, 可以实现 "选择性通知", 可以指定的选择唤醒哪些线程, 哪些线程继续等待.

public class MyService {

    private Lock lock = new ReentrantLock();
    public Condition conditionA = lock.newCondition();
    public Condition conditionB = lock.newCondition();

    public void awaitA() throws InterruptedException {
        lock.lock();

        System.out.println("begin awaitA 时间" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName());

        conditionA.await();

        System.out.println("end awaitA 时间" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName());

        lock.unlock();
    }

    public void awaitB() throws InterruptedException {
        lock.lock();

        System.out.println("begin awaitB 时间" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName());

        conditionB.await();

        System.out.println("end awaitB 时间" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName());

        lock.unlock();
    }

    public void  signalAll_A() throws InterruptedException {
        lock.lock();
        System.out.println("begin signalAll_A 时间" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName());

        conditionA.signalAll();

        lock.unlock();
    }

    public void  signalAll_B() throws InterruptedException {
        lock.lock();
        System.out.println("begin signalAll_B 时间" + System.currentTimeMillis() + "ThreadName=" + Thread.currentThread().getName());

        conditionB.signalAll();

        lock.unlock();
    }
}
public class ThreadA extends Thread {

    private MyService myService;
    public ThreadA(MyService myService) {
        this.myService = myService;
    }

    @Override
    public void run() {
        try {
            myService.awaitA();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class ThreadB extends Thread {

    private MyService myService;

    public ThreadB(MyService myService) {
        this.myService = myService;
    }

    @Override
    public void run() {
        try {
            myService.awaitB();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
    public static void main(String[] args) throws IOException, InterruptedException {

        MyService myService = new MyService();

        ThreadA threadA = new ThreadA(myService);
        threadA.setName("a");
        threadA.start();

        ThreadB threadB = new ThreadB(myService);
        threadB.setName("b");
        threadB.start();

        Thread.sleep(3000);
        myService.signalAll_A();

    }

Object 类中的 wait() 方法相当于 Condition 类中的 await() 方法.

Object 类中的 wait(long timeout) 方法相当于 Condition 类中的 await(long time, TimeUnit unit) 方法.

Object 类中的 notify() 方法相当于 Condition 类中的 signal() 方法.

Object 类中的 notifyAll() 方法相当于 Condition 类中的 signalAll() 方法.

从执行结果来看, a 和 b 线程被暂停, 当执行 myService.signalAll_A() 方法时, a 线程继续执行, 而 b 线程仍然是等待状态.

源码

ReentrantLock 类实现了 Lock, java.io.Serializable

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

两个构造方法, 默认是非公平锁, 如果为 true 表明是公平锁.

可以看到 NonfairSyncFairSync 都是继承了 Sync 这个抽象类, 而 Sync 则继承了AQS. SyncNonfairSyncFairSync 都是 ReentrantLock 的静态内部类, ReentrantLock 的许多方法都是Sync类代为实现.

AbstractQueuedSynchronizer 核心方法

AQS最核心的数据结构是一个 volatile int state 和 一个 FIFO 线程等待对列.

state 代表共享资源的数量, 如果是互斥访问, 一般设置为1, 而如果是共享访问, 可以设置为N(N为可共享线程的个数);

而线程等待队列是一个双向链表, 无法立即获得锁而进入阻塞状态的线程会加入队列的尾部. 当然对 state 以及队列的操作都是采用了 volatile + CAS + 自旋 的操作方式, 采用的是乐观锁的概念.

acquire 方法

此方法是独占模式下线程获取共享资源的顶层入口.

public void lock() {
    sync.acquire(1);
}
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&    //尝试获取锁,若获取成功,则state减1,返回true
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //若获取锁不成功,调用addWaiter方法使线程进入等待队列,acquireQueued方法让线程进入阻塞状态
            selfInterrupt(); //检查在等待过程中是否有中断,若有中断,则在此时再响应
}
tryAcquire方法
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}

为什么要抛出异常而不是声明为抽象类呢?
因为AQS是可选模式的, 我们选择的是独占模式, 就不需要去重写 tryAcquireShared 方法, 如果我们选的是共享模式, 也不需要重写 tryAcquire 方法, 因此AQS虽然是抽象类, 但是没有抽象方法, 而是用抛出异常的方式代替.

addWaiter 方法

addWaiter方法的主要是把当前线程加入到FIFO等待队列队尾.

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//创建节点
        // 首先尝试快速插入队尾
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {//CAS操作
                pred.next = node;
                return node;
            }
        }
        enq(node);//若不成功,则尝试以自旋方式插入队尾
        return node;
}
private Node enq(final Node node) {
        for (;;) {//自旋方式不断尝试插入队尾,直至成功为止
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}
acquireQueued 方法

acquireQueued 方法, 主要是让加入队尾的线程进入等待状态, 等到前面的进程执行完了, 再唤醒该线程, 去执行同步代码在这里是检测是否应该park()(park是一个Unsafe包中的native方法), 以及检测在队列的等待过程中是否有中断, 在等待过程中是不响应中断的, 等到等待结束被唤醒时, 才去向上传递是否中断过的值.

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//自旋
                final Node p = node.predecessor();//获取前驱节点
                if (p == head && tryAcquire(arg)) {//若前驱节点是头节点,便可以尝试去获取资源,若获取到资源,则进行下面的队列修改
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && //检测是否需要等待及找到一个前驱未放弃的节点,连接在后面
                    parkAndCheckInterrupt()) //等待,并且等到等待结束,返回是否被中断过
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
常用方法 ReentrantLock 类

int getHoldCount() 查询调用 lock() 方法的次数.

final int getQueueLength() 估计等待锁的线程数. 比如有5个线程, 1个线程首先执行 await() 方法, 那么在调用此方法后返回值是4, 说明有4个线程同时在等待lock的释放.

int getWaitQueueLength(Condition condition) 返回与此锁相关联给定条件等待的线程数的估计. 比如有5个线程, 每个线程都执行了同一个 condition 对象的 await() 方法, 则调用此方法时返回的值是5.

final boolean hasQueuedThreads() 判断是否有线程等待此锁.

final boolean hasQueuedThread(Thread thread) 判断指定线程是否等待获取此锁.

boolean hasWaiters(Condition condition) 判断线程有没有调用 await() 方法.

void lockInterruptibly() throws InterruptedException 获取锁, 除非当前线程为interrupted.

Condition 类

void awaitUninterruptibly()await() 区别就是当调用 interrupt() 方法时不会抛出 InterrputedException 异常.

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74007.html

相关文章

  • Java多线程进阶(三)—— J.U.C之locks框架:ReentrantLock

    摘要:公平策略在多个线程争用锁的情况下,公平策略倾向于将访问权授予等待时间最长的线程。使用方式的典型调用方式如下二类原理的源码非常简单,它通过内部类实现了框架,接口的实现仅仅是对的的简单封装,参见原理多线程进阶七锁框架独占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首发于一世流云的专栏:https...

    jasperyang 评论0 收藏0
  • Java 重入锁 ReentrantLock 原理分析

    摘要:的主要功能和关键字一致,均是用于多线程的同步。而仅支持通过查询当前线程是否持有锁。由于和使用的是同一把可重入锁,所以线程可以进入方法,并再次获得锁,而不会被阻塞住。公平与非公平公平与非公平指的是线程获取锁的方式。 1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似。所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻...

    lx1036 评论0 收藏0
  • J.U.C|可重入锁ReentrantLock

    摘要:二什么是重入锁可重入锁,顾名思义,支持重新进入的锁,其表示该锁能支持一个线程对资源的重复加锁。将由最近成功获得锁,并且还没有释放该锁的线程所拥有。可以使用和方法来检查此情况是否发生。 一、写在前面 前几篇我们具体的聊了AQS原理以及底层源码的实现,具体参见 《J.U.C|一文搞懂AQS》《J.U.C|同步队列(CLH)》《J.U.C|AQS独占式源码分析》《J.U.C|AQS共享式源...

    wangdai 评论0 收藏0
  • Lock锁子了解一下

    摘要:前言回顾前面多线程三分钟就可以入个门了源码剖析多线程基础必要知识点看了学习多线程事半功倍锁机制了解一下简简单单过一遍只有光头才能变强上一篇已经将锁的基础简单地过了一遍了,因此本篇主要是讲解锁主要的两个子类那么接下来我们就开始吧一锁首先我们来 前言 回顾前面: 多线程三分钟就可以入个门了! Thread源码剖析 多线程基础必要知识点!看了学习多线程事半功倍 Java锁机制了解一下 AQ...

    时飞 评论0 收藏0
  • Java同步机制的底层实现

    摘要:在多线程编程中我们会遇到很多需要使用线程同步机制去解决的并发问题,而这些同步机制就是多线程编程中影响正确性和运行效率的重中之重。这五个方法之所以能指定同步器的行为,则是因为中的其他方法就是通过对这五个方法的调用来实现的。 在多线程编程中我们会遇到很多需要使用线程同步机制去解决的并发问题,而这些同步机制就是多线程编程中影响正确性和运行效率的重中之重。这不禁让我感到好奇,这些同步机制是如何...

    yintaolaowanzi 评论0 收藏0

发表评论

0条评论

mykurisu

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<