资讯专栏INFORMATION COLUMN

使用Guava RateLimiter限流以及源码解析

simpleapples / 3037人阅读

摘要:令牌桶算法对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。使用以及源码解析开源工具包提供了限流工具类,该类基于令牌桶算法实现流量限制,使用十分方便,而且十分高效。

前言

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流

缓存 缓存的目的是提升系统访问速度和增大系统处理容量

降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开

限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

常用的限流算法 漏桶算法
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

令牌桶算法
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

RateLimiter使用以及源码解析
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,而且十分高效。
RateLimiter使用

首先简单介绍下RateLimiter的使用,

public void testAcquire() {
      RateLimiter limiter = RateLimiter.create(1);

      for(int i = 1; i < 10; i = i + 2 ) {
          double waitTime = limiter.acquire(i);
          System.out.println("cutTime=" + System.currentTimeMillis() + " acq:" + i + " waitTime:" + waitTime);
      }
  }

输出结果:

cutTime=1535439657427 acq:1 waitTime:0.0
cutTime=1535439658431 acq:3 waitTime:0.997045
cutTime=1535439661429 acq:5 waitTime:2.993028
cutTime=1535439666426 acq:7 waitTime:4.995625
cutTime=1535439673426 acq:9 waitTime:6.999223

首先通过RateLimiter.create(1);创建一个限流器,参数代表每秒生成的令牌数,通过limiter.acquire(i);来以阻塞的方式获取令牌,当然也可以通过tryAcquire(int permits, long timeout, TimeUnit unit)来设置等待超时时间的方式获取令牌,如果超timeout为0,则代表非阻塞,获取不到立即返回。

从输出来看,RateLimiter支持预消费,比如在acquire(5)时,等待时间是3秒,是上一个获取令牌时预消费了3个两排,固需要等待3*1秒,然后又预消费了5个令牌,以此类推

RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费),在使用过程中需要注意这一点,具体实现原理后面再分析。

RateLimiter实现原理
Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值) 两种模式实现思路类似,主要区别在等待时间的计算上,本篇重点介绍SmoothBursty
RateLimiter的创建

通过调用RateLimiter的create接口来创建实例,实际是调用的SmoothBuisty稳定模式创建的实例。

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }

  static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

SmoothBursty中的两个构造参数含义:

SleepingStopwatch:guava中的一个时钟类实例,会通过这个来计算时间及令牌

maxBurstSeconds:官方解释,在ReteLimiter未使用时,最多保存几秒的令牌,默认是1

在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义

/**
 * The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
 * 在RateLimiter未使用时,最多存储几秒的令牌
 * */
 final double maxBurstSeconds;
 

/**
 * The currently stored permits.
 * 当前存储令牌数
 */
double storedPermits;

/**
 * The maximum number of stored permits.
 * 最大存储令牌数 = maxBurstSeconds * stableIntervalMicros(见下文)
 */
double maxPermits;

/**
 * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
 * per second has a stable interval of 200ms.
 * 添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
 */
double stableIntervalMicros;

/**
 * The time when the next request (no matter its size) will be granted. After granting a request,
 * this is pushed further in the future. Large requests push this further than small requests.
 * 下一次请求可以获取令牌的起始时间
 * 由于RateLimiter允许预消费,上次请求预消费令牌后
 * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
 */
private long nextFreeTicketMicros = 0L; // could be either in the past or future

接下来介绍几个关键函数

setRate

public final void setRate(double permitsPerSecond) {
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  synchronized (mutex()) {
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}

通过这个接口设置令牌通每秒生成令牌的数量,内部时间通过调用SmoothRateLimiterdoSetRate来实现

doSetRate

@Override
  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

这里先通过调用resync生成令牌以及更新下一期令牌生成时间,然后更新stableIntervalMicros,最后又调用了SmoothBurstydoSetRate

resync

/**
 * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
 * 基于当前时间,更新下一次请求令牌的时间,以及当前存储的令牌(可以理解为生成令牌)
 */
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}

根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?

一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。

另一种解法则是延迟计算,如上resync函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。

SmoothBursty的doSetRate

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond;
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don"t special-case this, we would get storedPermits == NaN, below
    // Double.POSITIVE_INFINITY 代表无穷啊
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}

桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。也就是流量不局限于qps

RateLimiter几个常用接口分析

在了解以上概念后,就非常容易理解RateLimiter暴露出来的接口

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

/**
* 获取令牌,返回阻塞的时间
**/
@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回,主要通过reserve返回需要等待的时间,reserve中通过调用reserveAndGetWaitLength获取等待时间

/**
 * Reserves next ticket and returns the wait time that the caller must wait for.
 *
 * @return the required wait time, never negative
 */
final long reserveAndGetWaitLength(int permits, long nowMicros) {
  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  return max(momentAvailable - nowMicros, 0);
}

最后调用了reserveEarliestAvailable

@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros;
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  double freshPermits = requiredPermits - storedPermitsToSpend;
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros);

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
首先通过resync生成令牌以及同步nextFreeTicketMicros时间戳,freshPermits从令牌桶中获取令牌后还需要的令牌数量,通过storedPermitsToWaitTime计算出获取freshPermits还需要等待的时间,在稳定模式中,这里就是(long) (freshPermits * stableIntervalMicros) ,然后更新nextFreeTicketMicros以及storedPermits,这次获取令牌需要的等待到的时间点, reserveAndGetWaitLength返回需要等待的时间间隔。

从`reserveEarliestAvailable`可以看出RateLimiter的预消费原理,以及获取令牌的等待时间时间原理(可以解释示例结果),再获取令牌不足时,并没有等待到令牌全部生成,而是更新了下次获取令牌时的nextFreeTicketMicros,从而影响的是下次获取令牌的等待时间。



 `reserve`这里返回等待时间后,`acquire`通过调用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`进行sleep操作,这里不同于Thread.sleep(), 这个函数的sleep是uninterruptibly的,内部实现:
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
    //sleep 阻塞线程 内部通过Thread.sleep()
  boolean interrupted = false;
  try {
    long remainingNanos = unit.toNanos(sleepFor);
    long end = System.nanoTime() + remainingNanos;
    while (true) {
      try {
        // TimeUnit.sleep() treats negative timeouts just like zero.
        NANOSECONDS.sleep(remainingNanos);
        return;
      } catch (InterruptedException e) {
        interrupted = true;
        remainingNanos = end - System.nanoTime();
        //如果被interrupt可以继续,更新sleep时间,循环继续sleep
      }
    }
  } finally {
    if (interrupted) {
      Thread.currentThread().interrupt();
      //如果被打断过,sleep过后再真正中断线程
    }
  }
}
 sleep之后,`acquire`返回sleep的时间,阻塞结束,获取到令牌。


public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}

public boolean tryAcquire() {
  return tryAcquire(1, 0, MICROSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}

tryAcquire函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false
canAcquire用于判断timeout时间内是否可以获取令牌,通过判断当前时间+超时时间是否大于nextFreeTicketMicros 来决定是否能够拿到足够的令牌数,如果可以获取到,则过程同acquire,线程sleep等待,如果通过canAcquire在此超时时间内不能回去到令牌,则可以快速返回,不需要等待timeout后才知道能否获取到令牌。

因为SmoothBursty允许一定程度的突发,会有人担心如果允许这种突发,假设突然间来了很大的流量,那么系统很可能扛不住这种突发。因此需要一种平滑速率的限流工具,从而系统冷启动后慢慢的趋于平均固定速率(即刚开始速率小一些,然后慢慢趋于我们设置的固定速率)。Guava也提供了SmoothWarmingUp来实现这种需求,其可以认为是漏桶算法,但是在某些特殊场景又不太一样。

SmoothWarmingUp创建方式:

public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
  checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
  return create(
      permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}

permitsPerSecond表示每秒新增的令牌数,warmupPeriod表示在从冷启动速率过渡到平均速率的时间间隔,大致原理是类似的,这里就先不分析了。

到此,Guava RateLimiter稳定模式的实现原理基本已经清楚,如发现文中错误的地方,劳烦指正!

上述分析主要参考了:https://segmentfault.com/a/11...,再此基础上做了些笔记补充

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76899.html

相关文章

  • 限流器及Guava实现分析

    摘要:计数限流算法无论固定窗口还是滑动窗口核心均是对请求进行计数,区别仅仅在于对于计数时间区间的处理。令牌桶限流实现原理令牌桶限流的实现原理在有详细说明。因此由此为入口进行分析。目前可返回的实现子类包括及两种,具体不同下文详细分析。 限流 限流一词常用于计算机网络之中,定义如下: In computer networks, rate limiting is used to control t...

    xcc3641 评论0 收藏0
  • 服务限流(自定义注解令牌桶算法)

    摘要:自定义注解实现基于接口限流仔细看会发现上面的简单实现会造成我每个接口都要写一次限流方法代码很冗余所以采用来使用自定义注解来实现。 服务限流 -- 自定义注解基于RateLimiter实现接口限流 令牌桶限流算法showImg(https://segmentfault.com/img/bVbgTi6?w=2420&h=1547);图片来自网上 令牌桶会以一个恒定的速率向固定容量大小桶...

    microcosm1994 评论0 收藏0
  • 基于Redis和Lua的分布式限流

    摘要:和分布式限流本质上是一个集群并发问题,单进程单线程的特性,天然可以解决分布式集群的并发问题。的限流实现是微服务架构的网关组件,它基于和实现了令牌桶算法的限流功能,下面我们就来看一下它的原理和细节吧。基于模式,提供了限流过滤器。  Java单机限流可以使用AtomicInteger,RateLimiter或Semaphore来实现,但是上述方案都不支持集群限流。集群限流的应用场景有两个,...

    verano 评论0 收藏0
  • 接口限流算法:漏桶算法&令牌桶算法

    摘要:令牌桶算法漏桶算法漏桶漏桶的出水速度是恒定的,那么意味着如果瞬时大流量的话,将有大部分请求被丢弃掉也就是所谓的溢出。 工作中对外提供的API 接口设计都要考虑限流,如果不考虑限流,会成系统的连锁反应,轻者响应缓慢,重者系统宕机,整个业务线崩溃,如何应对这种情况呢,我们可以对请求进行引流或者直接拒绝等操作,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。 在开发高并发...

    dendoink 评论0 收藏0
  • RateLimter源码解析

    摘要:允许突发流量的平滑限流器的实现。实例执行结果方法返回结果代表获取所等待的时间。先看下示例代码运行效果为了预防突然暴增的流量将系统压垮,很贴心的增加了预热。 RateLimiter 类图 showImg(https://segmentfault.com/img/bVbflrs?w=1598&h=1076); RateLimiter:作为抽象类提供一个限流器的基本的抽象方法。SmoothR...

    RaoMeng 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<