资讯专栏INFORMATION COLUMN

Java多线程进阶(十九)—— J.U.C之synchronizer框架:CyclicBarrier

tulayang / 2658人阅读

摘要:当到达栅栏后,由于没有满足总数的要求,所以会一直等待,当线程到达后,栅栏才会放行。任务其实就是当最后一个线程到达栅栏时,后续立即要执行的任务。

本文首发于一世流云专栏:https://segmentfault.com/blog...
一、CyclicBarrier简介

CyclicBarrier是一个辅助同步器类,在JDK1.5时随着J.U.C一起引入。

这个类的功能和我们之前介绍的CountDownLatch有些类似。我们知道,CountDownLatch是一个倒数计数器,在计数器不为0时,所有调用await的线程都会等待,当计数器降为0,线程才会继续执行,且计数器一旦变为0,就不能再重置了。

CyclicBarrier可以认为是一个栅栏,栅栏的作用是什么?就是阻挡前行。
顾名思义,CyclicBarrier是一个可以循环使用的栅栏,它做的事情就是:
让线程到达栅栏时被阻塞(调用await方法),直到到达栅栏的线程数满足指定数量要求时,栅栏才会打开放行。

这其实有点像军训报数,报数总人数满足教官认为的总数时,教官才会安排后面的训练。

可以看下面这个图来理解下:
一共4个线程A、B、C、D,它们到达栅栏的顺序可能各不相同。当A、B、C到达栅栏后,由于没有满足总数【4】的要求,所以会一直等待,当线程D到达后,栅栏才会放行。

从CyclicBarrier的构造器,我们也可以看出关于这个类的一些端倪,CyclicBarrier有两个构造器:

构造器一:

这个构造器的参数parties就是之前说的需要满足的计数总数。

构造器二:

这个构造器稍微特殊一些,除了指定了计数总数外,传入了一个Runnable任务。

Runnable任务其实就是当最后一个线程到达栅栏时,后续立即要执行的任务。

比如,军训报数完毕后,总人数满足了要求,教官就会开始命令大家执行下一个任务,这个【下一个任务】就是这里的Runnable。
二、CyclicBarrier示例

我们来看一个CyclicBarrier的示例,来理解下它的功能。

假设现在有这样一个场景:
5个运动员准备跑步比赛,运动员在赛跑前会准备一段时间,当裁判发现所有运动员准备完毕后,就举起发令枪,比赛开始。
这里的起跑线就是屏障,运动员必须在起跑线等待其他运动员准备完毕。
public class CyclicBarrierTest {
    public static void main(String[] args) {

        int N = 5;  // 运动员数
        CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
            @Override
            public void run() {
                System.out.println("****** 所有运动员已准备完毕,发令枪:跑!******");
            }
        });

        for (int i = 0; i < N; i++) {
            Thread t = new Thread(new PrepareWork(cb), "运动员[" + i + "]");
            t.start();
        }

    }


    private static class PrepareWork implements Runnable {
        private CyclicBarrier cb;

        PrepareWork(CyclicBarrier cb) {
            this.cb = cb;
        }

        @Override
        public void run() {

            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName() + ": 准备完成");
                cb.await();          // 在栅栏等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

执行上面的程序,可能的输出结果如下:

运动员[3]: 准备完成
运动员[1]: 准备完成
运动员[0]: 准备完成
运动员[2]: 准备完成
运动员[4]: 准备完成
****** 所有运动员已准备完毕,发令枪:跑!******

从输出可以看到,线程到达栅栏时会被阻塞(调用await方法),直到到达栅栏的线程数满足指定数量要求时,栅栏才会打开放行。

CyclicBarrier对异常的处理

我们知道,线程在阻塞过程中,可能被中断,那么既然CyclicBarrier放行的条件是等待的线程数达到指定数目,万一线程被中断导致最终的等待线程数达不到栅栏的要求怎么办?

CyclicBarrier一定有考虑到这种异常情况,不然其它所有等待线程都会无限制地等待下去。
那么CyclicBarrier是如何处理的呢?

我们看下CyclicBarrier的await()方法:

public int await() throws InterruptedException, BrokenBarrierException {
    //...
}

可以看到,这个方法除了抛出InterruptedException异常外,还会抛出BrokenBarrierException

BrokenBarrierException表示当前的CyclicBarrier已经损坏了,可能等不到所有线程都到达栅栏了,所以已经在等待的线程也没必要再等了,可以散伙了。

出现以下几种情况之一时,当前等待线程会抛出BrokenBarrierException异常:

其它某个正在await等待的线程被中断了

其它某个正在await等待的线程超时了

某个线程重置了CyclicBarrier(调用了reset方法,后面会讲到)

另外,只要正在Barrier上等待的任一线程抛出了异常,那么Barrier就会认为肯定是凑不齐所有线程了,就会将栅栏置为损坏(Broken)状态,并传播BrokenBarrierException给其它所有正在等待(await)的线程。

我们来对上面的例子做个改造,模拟下异常情况:

public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {

        int N = 5;  // 运动员数
        CyclicBarrier cb = new CyclicBarrier(N, new Runnable() {
            @Override
            public void run() {
                System.out.println("****** 所有运动员已准备完毕,发令枪:跑!******");
            }
        });

        List list = new ArrayList<>();
        for (int i = 0; i < N; i++) {
            Thread t = new Thread(new PrepareWork(cb), "运动员[" + i + "]");
            list.add(t);
            t.start();
            if (i == 3) {
                t.interrupt();  // 运动员[3]置中断标志位
            }
        }

        Thread.sleep(2000);
        System.out.println("Barrier是否损坏:" + cb.isBroken());
    }


    private static class PrepareWork implements Runnable {
        private CyclicBarrier cb;

        PrepareWork(CyclicBarrier cb) {
            this.cb = cb;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + ": 准备完成");
                cb.await();
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + ": 被中断");
            } catch (BrokenBarrierException e) {
                System.out.println(Thread.currentThread().getName() + ": 抛出BrokenBarrierException");
            }
        }
    }
}

可能的输出结果:

运动员[0]: 准备完成
运动员[2]: 准备完成
运动员[1]: 准备完成
运动员[3]: 准备完成
运动员[3]: 被中断
运动员[4]: 准备完成
运动员[4]: 抛出BrokenBarrierException
运动员[0]: 抛出BrokenBarrierException
运动员[1]: 抛出BrokenBarrierException
运动员[2]: 抛出BrokenBarrierException
Barrier是否损坏:true

这段代码,模拟了中断线程3的情况,从输出可以看到,线程0、1、2首先到达Brrier等待。
然后线程3到达,由于之前设置了中断标志位,所以线程3抛出中断异常,导致Barrier损坏,此时所有已经在栅栏等待的线程(0、1、2)都会抛出BrokenBarrierException异常。
此时,即使再有其它线程到达栅栏(线程4),都会抛出BrokenBarrierException异常。

注意:使用CyclicBarrier时,对异常的处理一定要小心,比如线程在到达栅栏前就抛出异常,此时如果没有重试机制,其它已经到达栅栏的线程会一直等待(因为没有还没有满足总数),最终导致程序无法继续向下执行。
三、CyclicBarrier原理 CyclicBarrier的构造

CyclicBarrier有两个构造器:
CyclicBarrier cb = new CyclicBarrier(10);

构造器内部的各个字段含义如下:

字段名 作用
parties 栅栏开启需要的到达线程总数
count 剩余未到达的线程总数
barrierCommand 最后一个线程到达后执行的任务
CyclicBarrier的内部结构

CyclicBarrier 并没有自己去实现AQS框架的API,而是利用了ReentrantLockCondition

public class CyclicBarrier {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();

    // 栅栏开启需要的到达线程总数
    private final int parties;

    // 最后一个线程到达后执行的任务
    private final Runnable barrierCommand;

    // 剩余未到达的线程总数
    private int count;

    // 当前轮次的运行状态
    private Generation generation = new Generation();

    // ...
}

需要注意的是generation这个字段:

我们知道,CyclicBarrier 是可以循环复用的,所以CyclicBarrier 的每一轮任务都需要对应一个generation 对象。
generation 对象内部有个broken字段,用来标识当前轮次的CyclicBarrier 是否已经损坏。
nextGeneration方法用来创建一个新的generation 对象,并唤醒所有等待线程,重置内部参数。

CyclicBarrier的核心方法

我们先来看下await方法:

可以看到,无论有没有超时功能,内部都是调了dowait这个方法:

dowait方法并不复杂,一共有3部分:

判断栅栏是否已经损坏或当前线程已经被中断,如果是会分别抛出异常;

如果当前线程是最后一个到达的线程,会尝试执行最终任务(如果构造CyclicBarrier对象时有传入Runnable的话),执行成功即返回,失败会破坏栅栏;

对于不是最后一个到达的线程,会在Condition队列上等待,为了防止被意外唤醒,这里用了一个自旋操作。

破坏栅栏用的是breakBarrier方法:

再来看下CyclicBarrierreset方法:

该方法先破坏栅栏,然后开始下一轮(新建一个generation对象)。

四、CyclicBarrier接口/类声明

类声明:

构造器声明:

接口声明:

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76618.html

相关文章

  • Java线程进阶(一)—— J.U.C并发包概述

    摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...

    anonymoussf 评论0 收藏0
  • Java线程进阶(十八)—— J.U.Csynchronizer框架:CountDownLatc

    摘要:线程可以调用的方法进入阻塞,当计数值降到时,所有之前调用阻塞的线程都会释放。注意的初始计数值一旦降到,无法重置。 showImg(https://segmentfault.com/img/remote/1460000016012041); 本文首发于一世流云的专栏:https://segmentfault.com/blog... 一、CountDownLatch简介 CountDow...

    Elle 评论0 收藏0
  • Java线程进阶(二二)—— J.U.Csynchronizer框架:Phaser

    摘要:分层支持分层一种树形结构,通过构造函数可以指定当前待构造的对象的父结点。当一个的参与者数量变成时,如果有该有父结点,就会将它从父结点中溢移除。当首次将某个结点链接到树中时,会同时向该结点的父结点注册一个参与者。 showImg(https://segmentfault.com/img/remote/1460000016010947); 本文首发于一世流云专栏:https://segme...

    Mr_zhang 评论0 收藏0
  • Java线程进阶(六)—— J.U.Clocks框架:AQS综述(1)

    摘要:在时,引入了包,该包中的大多数同步器都是基于来构建的。框架提供了一套通用的机制来管理同步状态阻塞唤醒线程管理等待队列。指针用于在结点线程被取消时,让当前结点的前驱直接指向当前结点的后驱完成出队动作。 showImg(https://segmentfault.com/img/remote/1460000016012438); 本文首发于一世流云的专栏:https://segmentfau...

    cocopeak 评论0 收藏0
  • 小马哥Java面试题课程总结

    摘要:无限期等待另一个线程执行特定操作。线程安全基本版请说明以及的区别值都不能为空数组结构上,通过数组和链表实现。优先考虑响应中断,而不是响应锁的普通获取或重入获取。只是在最后获取锁成功后再把当前线程置为状态然后再中断线程。 前段时间在慕课网直播上听小马哥面试劝退(面试虐我千百遍,Java 并发真讨厌),发现讲得东西比自己拿到offer还要高兴,于是自己在线下做了一点小笔记,供各位参考。 课...

    FingerLiu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<