资讯专栏INFORMATION COLUMN

CircuitBreaker模式的Java实现

animabear / 591人阅读

摘要:序状态转换闭开在设定的时间窗口内失败次数达到阈值,由闭开。进入开的同时启动进入半开状态的定时器。半开状态的计数器目前半开状态没有使用时间窗口,仅仅使用连续成功次数来计算,一旦失败,则将断路器设置为状态。

状态转换

闭->开

在设定的时间窗口内失败次数达到阈值,由闭->开。

开->半开

在处于开的状态,对目标的调用做失败返回,进入开的时候,启动计时器,设定时间过后进入半开状态。

半开->开

进入半开状态,会启动一个计数器,记录连续成功的调用次数,超过阈值,进入闭状态。有一次失败则进入开状态,同时清零连续成功调用次数。进入开的同时启动进入半开状态的定时器。

半开->闭

进入半开状态,会启动一个计数器,记录连续成功的调用次数,超过阈值,进入闭状态,同时清零连续成功调用次数。

实现要点

切到开状态启动的定时器

这里如果使用定时线程来做的话,开的线程多,管理比较麻烦,故这里改为维护一个切换到开状态的时间,在每次方法调用,判断是开状态时,判断是否已经过了这个超时阈值,超过的话,进入半开状态。

半开状态的计数器

目前半开状态没有使用时间窗口,仅仅使用连续成功次数来计算,一旦失败,则将断路器设置为open状态。如果连续成功次数达到阈值,则进入close状态。每次进入half-open的状态时,连续成功的计数器清零。

主要代码 断路器状态

</>复制代码

  1. public enum CircuitBreakerState {
  2. CLOSED, // working normally, calls are transparently passing through
  3. OPEN, // method calls are being intercepted and CircuitBreakerExceptions are being thrown instead
  4. HALF_OPEN // method calls are passing through; if another blacklisted exception is thrown, reverts back to OPEN
  5. }
带时间窗口的计数器

</>复制代码

  1. /**
  2. * 带时间窗口的限流计数器
  3. */
  4. public class LimitCounter {
  5. private long startTime;
  6. private long timeIntervalInMs;
  7. private int maxLimit;
  8. private AtomicInteger currentCount;
  9. public LimitCounter(long timeIntervalInMs, int maxLimit) {
  10. super();
  11. this.timeIntervalInMs = timeIntervalInMs;
  12. this.maxLimit = maxLimit;
  13. startTime = System.currentTimeMillis();
  14. currentCount = new AtomicInteger(0);
  15. }
  16. public int incrAndGet() {
  17. long currentTime = System.currentTimeMillis();
  18. if ((startTime + timeIntervalInMs) < currentTime) {
  19. synchronized (this) {
  20. if ((startTime + timeIntervalInMs) < currentTime) {
  21. startTime = currentTime;
  22. currentCount.set(0);
  23. }
  24. }
  25. }
  26. return currentCount.incrementAndGet();
  27. }
  28. public boolean thresholdReached(){
  29. return currentCount.get() > maxLimit;
  30. }
  31. public int get(){
  32. return currentCount.get();
  33. }
  34. public /*synchronized*/ void reset(){
  35. currentCount.set(0);
  36. }
  37. }
主要配置

</>复制代码

  1. public class CircuitBreakerConfig {
  2. //closed状态的失败次数阈值
  3. private int failThreshold = 5;
  4. //closed状态的失败计数的时间窗口
  5. private int failCountWindowInMs = 60*1000;
  6. //处于open状态下进入half-open的超时时间
  7. private int open2HalfOpenTimeoutInMs = 5*1000;
  8. //half-open状态下成功次数阈值
  9. private int consecutiveSuccThreshold = 5;
  10. private CircuitBreakerConfig(){
  11. }
  12. public static CircuitBreakerConfig newDefault(){
  13. CircuitBreakerConfig config = new CircuitBreakerConfig();
  14. return config;
  15. }
  16. public int getFailThreshold() {
  17. return failThreshold;
  18. }
  19. public void setFailThreshold(int failThreshold) {
  20. this.failThreshold = failThreshold;
  21. }
  22. public int getFailCountWindowInMs() {
  23. return failCountWindowInMs;
  24. }
  25. public void setFailCountWindowInMs(int failCountWindowInMs) {
  26. this.failCountWindowInMs = failCountWindowInMs;
  27. }
  28. public int getOpen2HalfOpenTimeoutInMs() {
  29. return open2HalfOpenTimeoutInMs;
  30. }
  31. public void setOpen2HalfOpenTimeoutInMs(int open2HalfOpenTimeoutInMs) {
  32. this.open2HalfOpenTimeoutInMs = open2HalfOpenTimeoutInMs;
  33. }
  34. public int getConsecutiveSuccThreshold() {
  35. return consecutiveSuccThreshold;
  36. }
  37. public void setConsecutiveSuccThreshold(int consecutiveSuccThreshold) {
  38. this.consecutiveSuccThreshold = consecutiveSuccThreshold;
  39. }
  40. }
断路器

</>复制代码

  1. public class CircuitBreaker {
  2. private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);
  3. private String name;
  4. private CircuitBreakerConfig config;
  5. private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED;
  6. //最近进入open状态的时间
  7. private volatile long lastOpenedTime;
  8. //closed状态下失败次数
  9. private LimitCounter failCount ;
  10. //half-open状态的连续成功次数,失败立即清零
  11. private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);
  12. //构造器
  13. public CircuitBreaker(String name,CircuitBreakerConfig config) {
  14. this.config = config;
  15. this.name = name;
  16. failCount = new LimitCounter(config.getFailCountWindowInMs(),config.getFailThreshold());
  17. }
  18. //状态判断
  19. public boolean isOpen(){
  20. return CircuitBreakerState.OPEN == state;
  21. }
  22. public boolean isHalfOpen(){
  23. return CircuitBreakerState.HALF_OPEN == state;
  24. }
  25. public boolean isClosed(){
  26. return CircuitBreakerState.CLOSED == state;
  27. }
  28. //状态操作
  29. /**
  30. * closed->open | halfopen -> open
  31. */
  32. public void open(){
  33. lastOpenedTime = System.currentTimeMillis();
  34. state = CircuitBreakerState.OPEN;
  35. logger.debug("circuit open,key:{}",name);
  36. }
  37. /**
  38. * open -> halfopen
  39. */
  40. public void openHalf(){
  41. consecutiveSuccCount.set(0);
  42. state = CircuitBreakerState.HALF_OPEN;
  43. logger.debug("circuit open-half,key:{}",name);
  44. }
  45. /**
  46. * halfopen -> close
  47. */
  48. public void close(){
  49. failCount.reset();
  50. state = CircuitBreakerState.CLOSED;
  51. logger.debug("circuit close,key:{}",name);
  52. }
  53. //阈值判断
  54. /**
  55. * 是否应该转到half open
  56. * 前提是 open state
  57. * @return
  58. */
  59. public boolean isOpen2HalfOpenTimeout(){
  60. return System.currentTimeMillis() - config.getOpen2HalfOpenTimeoutInMs() > lastOpenedTime;
  61. }
  62. /**
  63. * 是否应该从close转到open
  64. * @return
  65. */
  66. public boolean isCloseFailThresholdReached(){
  67. return failCount.thresholdReached();
  68. }
  69. /**
  70. * half-open状态下是否达到close的阈值
  71. * @return
  72. */
  73. public boolean isConsecutiveSuccessThresholdReached(){
  74. return consecutiveSuccCount.get() >= config.getConsecutiveSuccThreshold();
  75. }
  76. //getter
  77. public void incrFailCount() {
  78. int count = failCount.incrAndGet();
  79. logger.debug("incr fail count:{},key:{}",count,name);
  80. }
  81. public AtomicInteger getConsecutiveSuccCount() {
  82. return consecutiveSuccCount;
  83. }
  84. public CircuitBreakerState getState() {
  85. return state;
  86. }
  87. }
断路器维护的变量

</>复制代码

  1. //最近进入open状态的时间
  2. private volatile long lastOpenedTime;
  3. //closed状态下失败次数
  4. private LimitCounter failCount ;
  5. //half-open状态的连续成功次数,失败立即清零
  6. private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);
基于jdk代理的拦截

</>复制代码

  1. public class CircuitBreakerInvocationHandler implements InvocationHandler{
  2. private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerInvocationHandler.class);
  3. private Object target;
  4. public CircuitBreakerInvocationHandler(Object target) {
  5. this.target = target;
  6. }
  7. //动态生成代理对象
  8. public Object proxy(){
  9. return Proxy.newProxyInstance(this.target.getClass().getClassLoader(), this.target.getClass().getInterfaces(), this);
  10. }
  11. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  12. GuardByCircuitBreaker breakerAnno = method.getAnnotation(GuardByCircuitBreaker.class);
  13. if(breakerAnno == null){
  14. return method.invoke(target,args);
  15. }
  16. Class[] noTripExs = breakerAnno.noTripExceptions();
  17. int timeout = breakerAnno.timeoutInMs();
  18. int interval = breakerAnno.failCountWindowInMs();
  19. int failThreshold = breakerAnno.failThreshold();
  20. CircuitBreakerConfig cfg = CircuitBreakerConfig.newDefault();
  21. if(interval != -1){
  22. cfg.setFailCountWindowInMs(interval);
  23. }
  24. if(failThreshold != -1){
  25. cfg.setFailThreshold(failThreshold);
  26. }
  27. String key = target.getClass().getSimpleName() + method.getName();
  28. CircuitBreaker breaker = CircuitBreakerRegister.get(key);
  29. if(breaker == null){
  30. breaker = new CircuitBreaker(key,cfg);
  31. CircuitBreakerRegister.putIfAbsent(key,breaker);
  32. }
  33. Object returnValue = null;
  34. logger.debug("breaker state:{},method:{}",breaker.getState(),method.toGenericString());
  35. //breaker state
  36. if(breaker.isOpen()){
  37. //判断是否该进入half open状态
  38. if(breaker.isOpen2HalfOpenTimeout()){
  39. //进入half open状态
  40. breaker.openHalf();
  41. logger.debug("method:{} into half open",method.toGenericString());
  42. returnValue = processHalfOpen(breaker,method,args,noTripExs);
  43. }else{
  44. throw new CircuitBreakerOpenException(method.toGenericString());
  45. }
  46. }else if(breaker.isClosed()){
  47. try{
  48. returnValue = method.invoke(target,args);
  49. // 这里看情况是否重置标志
  50. // breaker.close();
  51. }catch (Throwable t){
  52. if(isNoTripException(t,noTripExs)){
  53. throw t;
  54. }else{
  55. //增加计数
  56. breaker.incrFailCount();
  57. if(breaker.isCloseFailThresholdReached()){
  58. //触发阈值,打开
  59. logger.debug("method:{} reached fail threshold, circuit breaker open",method.toGenericString());
  60. breaker.open();
  61. throw new CircuitBreakerOpenException(method.toGenericString());
  62. }else{
  63. throw t;
  64. }
  65. }
  66. }
  67. }else if(breaker.isHalfOpen()){
  68. returnValue = processHalfOpen(breaker,method,args,noTripExs);
  69. }
  70. return returnValue;
  71. }
  72. private Object processHalfOpen(CircuitBreaker breaker,Method method, Object[] args,Class[] noTripExs) throws Throwable {
  73. try{
  74. Object returnValue = method.invoke(target,args);
  75. breaker.getConsecutiveSuccCount().incrementAndGet();
  76. if(breaker.isConsecutiveSuccessThresholdReached()){
  77. //调用成功则进入close状态
  78. breaker.close();
  79. }
  80. return returnValue;
  81. }catch (Throwable t){
  82. if(isNoTripException(t,noTripExs)){
  83. breaker.getConsecutiveSuccCount().incrementAndGet();
  84. if(breaker.isConsecutiveSuccessThresholdReached()){
  85. breaker.close();
  86. }
  87. throw t;
  88. }else{
  89. breaker.open();
  90. throw new CircuitBreakerOpenException(method.toGenericString(), t);
  91. }
  92. }
  93. }
  94. private boolean isNoTripException(Throwable t,Class[] noTripExceptions){
  95. if(noTripExceptions == null || noTripExceptions.length == 0){
  96. return false;
  97. }
  98. for(Class ex:noTripExceptions){
  99. //是否是抛出异常t的父类
  100. //t java.lang.reflect.InvocationTargetException
  101. if(ex.isAssignableFrom(t.getCause().getClass())){
  102. return true;
  103. }
  104. }
  105. return false;
  106. }
  107. }

</>复制代码

  1. github工程circuit-breaker
参考

martinfowler-CircuitBreaker

microsoft-Circuit Breaker Pattern(必读)

cloud-design-patterns-断路器模式

HystrixCircuitBreaker

熔断器设计模式(实现参考)

Creating a circuit breaker with Spring AOP(实现参考)

alenegro81/CircuitBreaker(参考jdk代理实现)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/65827.html

相关文章

  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    番茄西红柿 评论0 收藏0
  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    番茄西红柿 评论0 收藏0
  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    yangrd 评论0 收藏0

发表评论

0条评论

animabear

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<