资讯专栏INFORMATION COLUMN

聊聊Elasticsearch的CircuitBreaker

番茄西红柿 / 3094人阅读

摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执

本文主要研究一下Elasticsearch的CircuitBreaker

CircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/CircuitBreaker.java

</>复制代码

  1. /**

  2. * Interface for an object that can be incremented, breaking after some
  3. * configured limit has been reached.
  4. */
  5. public interface CircuitBreaker {
  6. /**
  7. * The parent breaker is a sum of all the following breakers combined. With
  8. * this we allow a single breaker to have a significant amount of memory
  9. * available while still having a "total" limit for all breakers. Note that
  10. * its not a "real" breaker in that it cannot be added to or subtracted
  11. * from by itself.
  12. */
  13. String PARENT = "parent";
  14. /**
  15. * The fielddata breaker tracks data used for fielddata (on fields) as well
  16. * as the id cached used for parent/child queries.
  17. */
  18. String FIELDDATA = "fielddata";
  19. /**
  20. * The request breaker tracks memory used for particular requests. This
  21. * includes allocations for things like the cardinality aggregation, and
  22. * accounting for the number of buckets used in an aggregation request.
  23. * Generally the amounts added to this breaker are released after a request
  24. * is finished.
  25. */
  26. String REQUEST = "request";
  27. /**
  28. * The in-flight request breaker tracks bytes allocated for reading and
  29. * writing requests on the network layer.
  30. */
  31. String IN_FLIGHT_REQUESTS = "in_flight_requests";
  32. /**
  33. * The accounting breaker tracks things held in memory that is independent
  34. * of the request lifecycle. This includes memory used by Lucene for
  35. * segments.
  36. */
  37. String ACCOUNTING = "accounting";
  38. enum Type {
  39. // A regular or ChildMemoryCircuitBreaker
  40. MEMORY,
  41. // A special parent-type for the hierarchy breaker service
  42. PARENT,
  43. // A breaker where every action is a noop, it never breaks
  44. NOOP;
  45. public static Type parseValue(String value) {
  46. switch(value.toLowerCase(Locale.ROOT)) {
  47. case "noop":
  48. return Type.NOOP;
  49. case "parent":
  50. return Type.PARENT;
  51. case "memory":
  52. return Type.MEMORY;
  53. default:
  54. throw new IllegalArgumentException("No CircuitBreaker with type: " + value);
  55. }
  56. }
  57. }
  58. enum Durability {
  59. // The condition that tripped the circuit breaker fixes itself eventually.
  60. TRANSIENT,
  61. // The condition that tripped the circuit breaker requires manual intervention.
  62. PERMANENT
  63. }
  64. /**
  65. * Trip the circuit breaker
  66. * @param fieldName name of the field responsible for tripping the breaker
  67. * @param bytesNeeded bytes asked for but unable to be allocated
  68. */
  69. void circuitBreak(String fieldName, long bytesNeeded);
  70. /**
  71. * add bytes to the breaker and maybe trip
  72. * @param bytes number of bytes to add
  73. * @param label string label describing the bytes being added
  74. * @return the number of "used" bytes for the circuit breaker
  75. */
  76. double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;
  77. /**
  78. * Adjust the circuit breaker without tripping
  79. */
  80. long addWithoutBreaking(long bytes);
  81. /**
  82. * @return the currently used bytes the breaker is tracking
  83. */
  84. long getUsed();
  85. /**
  86. * @return maximum number of bytes the circuit breaker can track before tripping
  87. */
  88. long getLimit();
  89. /**
  90. * @return overhead of circuit breaker
  91. */
  92. double getOverhead();
  93. /**
  94. * @return the number of times the circuit breaker has been tripped
  95. */
  96. long getTrippedCount();
  97. /**
  98. * @return the name of the breaker
  99. */
  100. String getName();
  101. /**
  102. * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
  103. */
  104. Durability getDurability();
  105. }

CircuitBreaker定义了Type、Durability枚举;它还定义了circuitBreak、addEstimateBytesAndMaybeBreak、addWithoutBreaking、getUsed、getLimit、getOverhead、getTrippedCount等方法;它有两个实现类分别是NoopCircuitBreaker、ChildMemoryCircuitBreaker

NoopCircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/NoopCircuitBreaker.java

</>复制代码

  1. public class NoopCircuitBreaker implements CircuitBreaker {

  2. public static final int LIMIT = -1;
  3. private final String name;
  4. public NoopCircuitBreaker(String name) {
  5. this.name = name;
  6. }
  7. @Override
  8. public void circuitBreak(String fieldName, long bytesNeeded) {
  9. // noop
  10. }
  11. @Override
  12. public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
  13. return 0;
  14. }
  15. @Override
  16. public long addWithoutBreaking(long bytes) {
  17. return 0;
  18. }
  19. @Override
  20. public long getUsed() {
  21. return 0;
  22. }
  23. @Override
  24. public long getLimit() {
  25. return LIMIT;
  26. }
  27. @Override
  28. public double getOverhead() {
  29. return 0;
  30. }
  31. @Override
  32. public long getTrippedCount() {
  33. return 0;
  34. }
  35. @Override
  36. public String getName() {
  37. return this.name;
  38. }
  39. @Override
  40. public Durability getDurability() {
  41. return Durability.PERMANENT;
  42. }
  43. }

NoopCircuitBreaker实现了CircuitBreaker接口,它不做任何操作

ChildMemoryCircuitBreaker

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/breaker/ChildMemoryCircuitBreaker.java

</>复制代码

  1. public class ChildMemoryCircuitBreaker implements CircuitBreaker {

  2. private final long memoryBytesLimit;
  3. private final double overheadConstant;
  4. private final Durability durability;
  5. private final AtomicLong used;
  6. private final AtomicLong trippedCount;
  7. private final Logger logger;
  8. private final HierarchyCircuitBreakerService parent;
  9. private final String name;
  10. /**
  11. * Create a circuit breaker that will break if the number of estimated
  12. * bytes grows above the limit. All estimations will be multiplied by
  13. * the given overheadConstant. This breaker starts with 0 bytes used.
  14. * @param settings settings to configure this breaker
  15. * @param parent parent circuit breaker service to delegate tripped breakers to
  16. * @param name the name of the breaker
  17. */
  18. public ChildMemoryCircuitBreaker(BreakerSettings settings, Logger logger,
  19. HierarchyCircuitBreakerService parent, String name) {
  20. this(settings, null, logger, parent, name);
  21. }
  22. /**
  23. * Create a circuit breaker that will break if the number of estimated
  24. * bytes grows above the limit. All estimations will be multiplied by
  25. * the given overheadConstant. Uses the given oldBreaker to initialize
  26. * the starting offset.
  27. * @param settings settings to configure this breaker
  28. * @param parent parent circuit breaker service to delegate tripped breakers to
  29. * @param name the name of the breaker
  30. * @param oldBreaker the previous circuit breaker to inherit the used value from (starting offset)
  31. */
  32. public ChildMemoryCircuitBreaker(BreakerSettings settings, ChildMemoryCircuitBreaker oldBreaker,
  33. Logger logger, HierarchyCircuitBreakerService parent, String name) {
  34. this.name = name;
  35. this.memoryBytesLimit = settings.getLimit();
  36. this.overheadConstant = settings.getOverhead();
  37. this.durability = settings.getDurability();
  38. if (oldBreaker == null) {
  39. this.used = new AtomicLong(0);
  40. this.trippedCount = new AtomicLong(0);
  41. } else {
  42. this.used = oldBreaker.used;
  43. this.trippedCount = oldBreaker.trippedCount;
  44. }
  45. this.logger = logger;
  46. if (logger.isTraceEnabled()) {
  47. logger.trace("creating ChildCircuitBreaker with settings {}", settings);
  48. }
  49. this.parent = parent;
  50. }
  51. /**
  52. * Method used to trip the breaker, delegates to the parent to determine
  53. * whether to trip the breaker or not
  54. */
  55. @Override
  56. public void circuitBreak(String fieldName, long bytesNeeded) {
  57. this.trippedCount.incrementAndGet();
  58. final String message = "[" + this.name + "] Data too large, data for [" + fieldName + "]" +
  59. " would be [" + bytesNeeded + "/" + new ByteSizeValue(bytesNeeded) + "]" +
  60. ", which is larger than the limit of [" +
  61. memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
  62. logger.debug("{}", message);
  63. throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit, durability);
  64. }
  65. /**
  66. * Add a number of bytes, tripping the circuit breaker if the aggregated
  67. * estimates are above the limit. Automatically trips the breaker if the
  68. * memory limit is set to 0. Will never trip the breaker if the limit is
  69. * set < 0, but can still be used to aggregate estimations.
  70. * @param bytes number of bytes to add to the breaker
  71. * @return number of "used" bytes so far
  72. */
  73. @Override
  74. public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
  75. // short-circuit on no data allowed, immediately throwing an exception
  76. if (memoryBytesLimit == 0) {
  77. circuitBreak(label, bytes);
  78. }
  79. long newUsed;
  80. // If there is no limit (-1), we can optimize a bit by using
  81. // .addAndGet() instead of looping (because we dont have to check a
  82. // limit), which makes the RamAccountingTermsEnum case faster.
  83. if (this.memoryBytesLimit == -1) {
  84. newUsed = noLimit(bytes, label);
  85. } else {
  86. newUsed = limit(bytes, label);
  87. }
  88. // Additionally, we need to check that we havent exceeded the parents limit
  89. try {
  90. parent.checkParentLimit((long) (bytes * overheadConstant), label);
  91. } catch (CircuitBreakingException e) {
  92. // If the parent breaker is tripped, this breaker has to be
  93. // adjusted back down because the allocation is "blocked" but the
  94. // breaker has already been incremented
  95. this.addWithoutBreaking(-bytes);
  96. throw e;
  97. }
  98. return newUsed;
  99. }
  100. private long noLimit(long bytes, String label) {
  101. long newUsed;
  102. newUsed = this.used.addAndGet(bytes);
  103. if (logger.isTraceEnabled()) {
  104. logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: [-1b]]",
  105. this.name, new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed));
  106. }
  107. return newUsed;
  108. }
  109. private long limit(long bytes, String label) {
  110. long newUsed;// Otherwise, check the addition and commit the addition, looping if
  111. // there are conflicts. May result in additional logging, but its
  112. // trace logging and shouldnt be counted on for additions.
  113. long currentUsed;
  114. do {
  115. currentUsed = this.used.get();
  116. newUsed = currentUsed + bytes;
  117. long newUsedWithOverhead = (long) (newUsed * overheadConstant);
  118. if (logger.isTraceEnabled()) {
  119. logger.trace("[{}] Adding [{}][{}] to used bytes [new used: [{}], limit: {} [{}], estimate: {} [{}]]",
  120. this.name,
  121. new ByteSizeValue(bytes), label, new ByteSizeValue(newUsed),
  122. memoryBytesLimit, new ByteSizeValue(memoryBytesLimit),
  123. newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead));
  124. }
  125. if (memoryBytesLimit > 0 && newUsedWithOverhead > memoryBytesLimit) {
  126. logger.warn("[{}] New used memory {} [{}] for data of [{}] would be larger than configured breaker: {} [{}], breaking",
  127. this.name,
  128. newUsedWithOverhead, new ByteSizeValue(newUsedWithOverhead), label,
  129. memoryBytesLimit, new ByteSizeValue(memoryBytesLimit));
  130. circuitBreak(label, newUsedWithOverhead);
  131. }
  132. // Attempt to set the new used value, but make sure it hasnt changed
  133. // underneath us, if it has, keep trying until we are able to set it
  134. } while (!this.used.compareAndSet(currentUsed, newUsed));
  135. return newUsed;
  136. }
  137. /**
  138. * Add an exact number of bytes, not checking for tripping the
  139. * circuit breaker. This bypasses the overheadConstant multiplication.
  140. *
  141. * Also does not check with the parent breaker to see if the parent limit
  142. * has been exceeded.
  143. *
  144. * @param bytes number of bytes to add to the breaker
  145. * @return number of "used" bytes so far
  146. */
  147. @Override
  148. public long addWithoutBreaking(long bytes) {
  149. long u = used.addAndGet(bytes);
  150. if (logger.isTraceEnabled()) {
  151. logger.trace("[{}] Adjusted breaker by [{}] bytes, now [{}]", this.name, bytes, u);
  152. }
  153. assert u >= 0 : "Used bytes: [" + u + "] must be >= 0";
  154. return u;
  155. }
  156. /**
  157. * @return the number of aggregated "used" bytes so far
  158. */
  159. @Override
  160. public long getUsed() {
  161. return this.used.get();
  162. }
  163. /**
  164. * @return the number of bytes that can be added before the breaker trips
  165. */
  166. @Override
  167. public long getLimit() {
  168. return this.memoryBytesLimit;
  169. }
  170. /**
  171. * @return the constant multiplier the breaker uses for aggregations
  172. */
  173. @Override
  174. public double getOverhead() {
  175. return this.overheadConstant;
  176. }
  177. /**
  178. * @return the number of times the breaker has been tripped
  179. */
  180. @Override
  181. public long getTrippedCount() {
  182. return this.trippedCount.get();
  183. }
  184. /**
  185. * @return the name of the breaker
  186. */
  187. @Override
  188. public String getName() {
  189. return this.name;
  190. }
  191. /**
  192. * @return whether a tripped circuit breaker will reset itself (transient) or requires manual intervention (permanent).
  193. */
  194. @Override
  195. public Durability getDurability() {
  196. return this.durability;
  197. }
  198. }

ChildMemoryCircuitBreaker实现了CircuitBreaker接口;其circuitBreak方法会抛出CircuitBreakingException

addEstimateBytesAndMaybeBreak方法首先判断memoryBytesLimit,如果为0,则执行circuitBreak方法;如果为-1则调用noLimit,否则调用limit计算newUsed,没有抛出异常的话,则最后执行 parent.checkParentLimit方法

noLimit方法直接执行this.used.addAndGet(bytes);limit方法首先计算newUsed,然后根据overheadConstant得出newUsedWithOverhead,如果newUsedWithOverhead大于memoryBytesLimit则执行circuitBreak方法,否则将newUsed更新到this.used中

小结

CircuitBreaker定义了Type、Durability枚举;它还定义了circuitBreak、addEstimateBytesAndMaybeBreak、addWithoutBreaking、getUsed、getLimit、getOverhead、getTrippedCount等方法;它有两个实现类分别是NoopCircuitBreaker、ChildMemoryCircuitBreaker

NoopCircuitBreaker实现了CircuitBreaker接口,它不做任何操作

ChildMemoryCircuitBreaker实现了CircuitBreaker接口;其circuitBreak方法会抛出CircuitBreakingException;addEstimateBytesAndMaybeBreak方法则先判断newUsed是否超出memoryBytesLimit,超出则执行circuitBreak方法,最后执行parent.checkParentLimit方法

doc

CircuitBreaker

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/7097.html

相关文章

  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    番茄西红柿 评论0 收藏0
  • 聊聊ElasticsearchCircuitBreaker

    摘要:序本文主要研究一下的定义了枚举它还定义了等方法它有两个实现类分别是实现了接口,它不做任何操作实现了接口其方法会抛出方法首先判断,如果为,则执行方法如果为则调用,否则调用计算,没有抛出异常的话,则最后执序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...

    yangrd 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<