限流的方法

Rate Limiter

Posted by LiuShuo on October 26, 2019

资源总是有限的,所以任何「获取」行为都不能无限度的满足,需要限制流量。就好比使用保险丝,它会在电流异常升高到一定的高度和热度的时候,自身熔断切断电流,保护了电路安全运行。

流量的特点

流量并不会按照我们预期的一样固定的时间、频率和大小到来,对于流量来说,没有时间周期的概念,任何一个时间点都可能出现大量的请求。

如果服务器维护一种类似「单位时间窗口(如1s)最多允许访问m次」的算法,可能只是一厢情愿的想法,因为在两个「单位窗口」交接点处的左右两个极限点可能出现m次的请求高峰。 它既满足左边的时间窗口的m次限制,也满足右边的窗口的m次限制,但是如果将时间窗口的范围包含一下这两个点时,就会发现请求量是2m,超过了服务器的阈值。

问题出现在哪里?

计算机可以处理「瞬时」的概念,而人由于脑力有限,喜欢用QPS来形容和理解问题。

限流的方法

信号量

信号量实际上就是通过限制系统的「瞬间」并发量来达到限流的目的,它只能限制瞬间的请求个数的上限,对于不同瞬间的请求量之间的关系没影响。 即系统可能在任何时刻都能打到最高的上限,如果请求耗时很短且系统服务能力无限的情况下,QPS会非常高。

所以信号量并不能限制系统的QPS,它只解决瞬间的峰值问题。

在Java里常见的用法是:创建Semaphore,指定permit的数量。在方法开始时,调用Semaphore.acquire()或者Semaphore.tryAcquire() 来获取permit,并在方法返回前,调用Semaphore.release()来返还permit。

另外一种思路是使用原子性的自增变量,如 AtomicInteger。每次执行方法前增+1并判断是否超过阈值,超过则执行其他逻辑。但会用finally块来-1, 保证最终不会让自增变量超过阈值。

如果用作客户端的熔断措施,使用信号量机制还是很方便的,保证了最高有多少个请求同时执行,过载直接返回默认值。

类似的,Nginx的ngx_http_limit_conn_module中的limit_conn参数也是用来限制某一个IP的瞬时并发数的。

计数器

既然是计数,那么一定是有一个时间范围的概念,即固定时间范围内允许执行的任务的个数,时间周期到则统计指标自动清零,开始新一轮的技术。

比如限制1秒钟内请求数最多为10个,每当进来一个请求,则计数器+1。当计数器达到上限时,则触发限流。时间每经过1秒,则重置计数器。

计数器的问题在于,它的设计是基于一个一个的时间周期的概念,不同时间周期之间没有影响,但对于服务器来说,它最关心的还是瞬时请求的问题。 在计数器场景中,两个时间周期的交界处的左右两个周期内可能集中的使用了这个周期内的所有资源,导致这两个里的非常近的时间点的瞬间请求非常大。

simple_RateLimiter.png

所以计数器也不是用来限制系统的QPS的,它的使用场景更多的是分配资源,1分钟内柜台只放1个号,你可以在任何一个时间点来领取。

滑动窗口

滑动窗口本质上也是一种计数器,只不过它的粒度更细。比如限制QPS为1000,设定窗口大小为10,则每个窗口的时间间隔为100ms。每次窗口滑动时,重置的是前1s至900ms 之间内的计数,而不是完整的1s。

滑动窗口和计数器的问题是一样的,它解决不了瞬间的问题,不能将瞬间承载不了的压力设置为整个窗口时间周期的指标,因为任何一个时间点都可以获取整个窗口周期内允许的资源。

漏桶算法

漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。 leaky_bucket 因为漏桶的漏出速率是固定的,因此它对于存在突发特性的流量来说缺乏效率,也正因为如此,漏铜的速率可以很好的控制瞬间的压力阈值。

基于漏桶(桶+恒定处理速率),可以起到对请求整流效果。漏桶算法可基于线程池来实现,线程池使用固定容量的阻塞队列+固定个数的处理线程来实现;最简单且最常见的漏桶思想的实现就是基于SynchronousQueue的线程池,其相当于一个空桶+固定处理线程。

注意:原生的漏桶算法以恒定速度出水(处理请求),但是实际场景中请求的处理耗时可能不相等,为了实现恒定速率,一般都是限定同时处理请求的最大线程数。

令牌桶算法

令牌桶算法的原理是,系统以固定的速率往令牌桶中放入令牌;当请求进来时,则从桶中取走令牌;当桶中令牌为空时,触发限流。 token_bucket 令牌桶与漏桶相比,好处在于它支持突发流量,缺点是突发流量可能超过它的瞬间处理能力导致服务崩溃。

所以令牌通算法更多的是提供一种「平滑」的控制流量的能力,它允许一定的峰值并将后续的请求安排到更远的周期内执行,从而降低对单位时间内的服务的压力。

令牌桶算法的一个实现方案是:启动一个Timer线程以固定频率往桶中放令牌,桶满时令牌溢出,业务线程在获取令牌时直接从桶中获取即可。该方案容易理解,但是需要一个Timer线程,资源占用较重。

令牌桶算法还有一种实现方案不需要用 Timer 线程,这个经典实现就是Guava中的RateLimiter。

SmoothBursty

Guava的RateLimiter接口的一个实现类 SmoothBursty (平滑突发限流)就是基于令牌桶的,允许突发的流量进入,后面再慢慢地平稳限流。

实现原理如下简述:

  • startTick 记录RateLimiter初始化时的时间戳(单位微秒),后续 nowMicros (当前时间点)都是取(System.nanoTime()-startTick) /1000;
  • nextFreeTicketMicros 记录下次可获取令牌的开始时间点,在RateLimiter初始化和获取到令牌之后会进行更新;如果 nowMicros 大于等于 nextFreeTicketMicros 表示可以获取令牌;
  • 如果 nowMicros 大于 nextFreeTicketMicros ,会计算二者差值并除以放一个令牌的周期,然后赋值给 storedPermits 字段(表示当前桶中令牌数,注意不能超过桶容量);
  • 然后 storedPermits 减去当前需要令牌数,如果此时要获取令牌数大于 storedPermits ,那么会将 nextFreeTicketMicros 再往后推进「 (要获取令牌 - storedPermits) * 放一个令牌的周期」 的时间。

测试代码:

1
2
RateLimiter rateLimiter = RateLimiter.create(1.0);
Arrays.asList(6, 2, 6).forEach(num -> System.out.println(System.currentTimeMillis() + " wait " + rateLimiter.acquire(num)));

输出:

1572005389241 wait 0.0 1572005389251 wait 5.989325 1572005395246 wait 1.994102

可以看出,SmoothBursty允许获取的令牌数量可以超过最大令牌数的限制,但是之后获取令牌的请求需要等待一定的时间来补充之前「透支」的令牌。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* The currently stored permits.
*/
double storedPermits;

/**
* The maximum number of stored permits.
*/
double maxPermits;

/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
double stableIntervalMicros;

/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future

create

1
2
3
4
5
6
7
8
9
10
public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);  // maxBurstSeconds 用于计算 maxPermits
    rateLimiter.setRate(permitsPerSecond); // 设置生成令牌的速率
    return rateLimiter;
}

setRate

1
2
3
4
5
6
7
public final void setRate(double permitsPerSecond) {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

注意这个方法是一个同步方法,锁住了 mutex 方法的返回值。

doSetRate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// SmoothRateLimiter类中的doSetRate方法,覆写了 RateLimiter 类中的 doSetRate 方法
// 此方法再委托下面的 doSetRate 方法做处理。
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

// SmoothBursty 和 SmoothWarmingUp 类中覆写此方法
abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);

// SmoothBursty 中对 doSetRate的实现
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = this.maxPermits;
    maxPermits = maxBurstSeconds * permitsPerSecond;
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
    } else {
        storedPermits =
                (oldMaxPermits == 0.0)
                        ? 0.0 // initial state
                        : storedPermits * maxPermits / oldMaxPermits;
    }
}

resync

1
2
3
4
5
6
7
8
private void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
        double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
        storedPermits = min(maxPermits, newPermits);
        nextFreeTicketMicros = nowMicros;
    }
}

resync 方法就是 RateLimiter惰性计算 的实现(所以无需 Timer 来提前生成,性能更好)。

每一次请求来的时候,都会调用到这个方法。这个方法的过程大致如下:

  • 首先判断当前时间是不是大于 nextFreeTicketMicros ,如果是则代表系统已经「cool down」, 这两次请求之间应该有新的 permit 生成。
  • 然后计算本次请求应该触发新添加 permit 的数量,这里分式的分母是 coolDownIntervalMicros 方法,它是一个抽象方法。在 SmoothBurstySmoothWarmingUp 中分别有不同的实现。 SmoothBursty 中返回的是 stableIntervalMicros 也即是 1 / QPScoolDownIntervalMicros 方法在 SmoothWarmingUp 中的计算方式为 warmupPeriodMicros / maxPermitswarmupPeriodMicrosSmoothWarmingUp 的「预热」时间。
  • 计算 storedPermits ,这个逻辑比较简单,取 maxPermitsnewPermits 的最小值,即无论「cool down」了多长时间最多的 storePermits 都是 maxPermits 不会再多。
  • 设置 nextFreeTicketMicrosnowMicros ,即向后推动该时间戳(谁让它已经过期了呢)。

tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
        long nowMicros = stopwatch.readMicros();
        // 首先判断当前超时时间之内请求能否被满足,不能满足的话直接返回失败
        if (!canAcquire(nowMicros, timeoutMicros)) { 
            return false;
        } else {
            // 计算本次请求需要等待的时间,核心方法
            microsToWait = reserveAndGetWaitLength(permits, nowMicros); 
        }
    }
    // 阻塞等待microsToWait时间
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
}

// 返回等待的时间
final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
}

// 是否可以在timeoutMicros时间内等待到执行获取令牌的机会(不管多少,反正可以预支)
private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

// 默认就是下一个时间点
final long queryEarliestAvailable(long nowMicros) {
    return nextFreeTicketMicros; 
}

canAcquire 方法逻辑比较简单,就是看 nextFreeTicketMicros 减去 timeoutMicros 是否小于等于 nowMicros 。如果是,则说明在未到 nowMicros+timeoutMicros 时就可以获取,返回true。

reserveEarliestAvailable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 计算本次请求需要等待的时间
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 本次请求和上次请求之间间隔的时间是否应该有新的令牌生成,所以参数是当前时间戳,与当前请求多少个无关
    // 如果有则更新 storedPermits 和 nextFreeTicketMicros
    resync(nowMicros); // 不一定会触发计算
    long returnValue = nextFreeTicketMicros; 
    
    // 本次请求的令牌数 requiredPermits 由两个部分组成:storedPermits 和 freshPermits
    // storedPermits 是令牌桶中已有的令牌
    // freshPermits 是需要新生成的令牌数
    
    // 需要从已经有的令牌中拿出去多少,最多就是storedPermits个
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // 如果storedPermits不够则需要生成freshPermits个新的
    double freshPermits = requiredPermits - storedPermitsToSpend;
    
    // 分别计算从两个部分拿走的令牌各自需要等待的时间,然后总和作为本次请求需要等待的时间,
    // SmoothBursty 中从 storedPermits 拿走的部分不需要等待时间
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);
            
    // 更新 nextFreeTicketMicros,这里更新的其实是下一次请求的时间,是一种“预消费”
    // 使用当前值和waitMicros的和更新为新的值
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    
    // 更新 storedPermits
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}

/**
* Translates a specified portion of our currently stored permits which we want to spend/acquire,
* into a throttling time. Conceptually, this evaluates the integral of the underlying function we
* use, for the range of [(storedPermits - permitsToTake), storedPermits].
*
* <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
*/
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

上面的代码是 SmoothRateLimiter 中的具体实现。其主要有以下步骤:

  • resync,其主要用来计算当前请求和上次请求之间这段时间需要生成新的 permit 数量。
  • 对于 requiredPermitsRateLimiter 将其分为两个部分:storedPermitsfreshPermitsstoredPermits 代表令牌桶中已经存在的令牌,可以直接拿出来用,freshPermits 代表本次请求需要新生成的 permit 数量。
  • 分别计算 storedPermitsfreshPermits 拿出来的部分的令牌数所需要的时间,对于 freshPermits 部分的时间比较好计算:直接拿 freshPermits 乘以 stableIntervalMicros 就可以得到。而对于需要从 storedPermits 中拿出来的部分则计算比较复杂,这个计算逻辑在 storedPermitsToWaitTime 方法中实现。 storedPermitsToWaitTime 方法在 SmoothBurstySmoothWarmingUp 中有不同的实现。 storedPermitsToWaitTime 意思就是表示当前请求从 storedPermits 中拿出来的令牌数需要等待的时间, 因为 SmoothBursty 中没有“热身”的概念, storedPermits 中有多少个就可以用多少个,不需要等待, 因此 storedPermitsToWaitTime 方法在 SmoothBursty 中返回的是 0
  • 计算到了本次请求需要等待的时间 waitMicros 之后,会将这个时间加到 nextFreeTicketMicros 中去。最后从 storedPermits 减去本次请求从这部分拿走的令牌数量。
  • reserveEarliestAvailable 方法返回的是本次请求需要等待的时间,该方法中算出来的 waitMicros 按理来说是应该作为返回值的,但是这个方法返回的却是开始计算时的 nextFreeTicketMicros ,而算出来的 waitMicros 则被累加到 nextFreeTicketMicros 中去了。这里其实就是“预消费”,让下一次消费来为本次消费来“买单”。

SmoothWarmingUp

属性

1
2
3
4
5
6
7
8
9
10
11
static final class SmoothWarmingUp extends SmoothRateLimiter {
    private final long warmupPeriodMicros;
    /**
     * The slope of the line from the stable interval (when permits == 0), to the cold interval
     * (when permits == maxPermits)
     */
    private double slope;
    private double thresholdPermits;
    private double coldFactor;
    ...
}

storedPermitsToWaitTime

SmoothWarmingUp 相对 SmoothBursty 来说主要区别在于 storedPermitsToWaitTime 方法。其他部分原理和 SmoothBursty 类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
    double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
    long micros = 0;
    // measuring the integral on the right part of the function (the climbing line)
    if (availablePermitsAboveThreshold > 0.0) { // 如果当前 storedPermits 超过 availablePermitsAboveThreshold 则计算从 超过部分拿令牌所需要的时间(图中的 WARM UP PERIOD)
        // WARM UP PERIOD 部分计算的方法,这部分是一个梯形,梯形的面积计算公式是 “(上底 + 下底) * 高 / 2”
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        // TODO(cpovirk): Figure out a good name for this variable.
        double length = permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        micros = (long) (permitsAboveThresholdToTake * length / 2.0); // 计算出从 WARM UP PERIOD 拿走令牌的时间
        permitsToTake -= permitsAboveThresholdToTake; // 剩余的令牌从 stable 部分拿
    }
    // measuring the integral on the left part of the function (the horizontal line)
    micros += (stableIntervalMicros * permitsToTake); // stable 部分令牌获取花费的时间
    return micros;
}

// WARM UP PERIOD 部分 获取相应令牌所对应的的时间
private double permitsToTime(double permits) {
    return stableIntervalMicros + permits * slope;
}

SmoothWarmingUp 类中 storedPermitsToWaitTime 方法将 permitsToTake 分为两部分,一部分从 WARM UP PERIOD 部分拿,这部分是一个梯形,面积计算就是(上底 + 下底)* 高 / 2。另一部分从 stable 部分拿,它是一个长方形,面积就是 长 * 宽。最后返回两个部分的时间总和。

摘取注释文档中画的图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 *          ^ throttling
 *          |
 * 3*stable +                  /
 * interval |                 /.
 *  (cold)  |                / .
 *          |               /  .   <-- "warmup period" is the area of the trapezoid between
 * 2*stable +              /   .       halfPermits and maxPermits
 * interval |             /    .
 *          |            /     .
 *          |           /      .
 *   stable +----------/  WARM . }
 * interval |          .   UP  . } <-- this rectangle (from 0 to maxPermits, and
 *          |          . PERIOD. }     height == stableInterval) defines the cooldown period,
 *          |          .       . }     and we want cooldownPeriod == warmupPeriod
 *          |---------------------------------> storedPermits
 *              (halfPermits) (maxPermits)
 *

简单来说,上图展示了一种机制:当前存储的令牌数(storedPermits)越多,生成令牌的间隔时间就越长。 当存储的令牌数到达最大值(maxPermits)生成令牌的间隔时间也到达最大值(cold interval)。 cold interval同时受stable interval和coldFactor的影响,是两者的乘积(coldFactor默认为3.0, 即cold interval是stable interval的3倍)。 thresholdPermits是一个拐点,当令牌数小于thresholdPermits时生成令牌的间隔时间稳定在stable interval; 当令牌数大于thresholdPermits时,生成令牌的间隔时间以一个固定的速率发生变化。 thresholdPermits等于预热时间内产生令牌数量的一半。

SmoothWarmingUp 实现预热缓冲的关键在于其分发令牌的速率会随时间和令牌数而改变,速率会先慢后快。 表现形式如上图所示,令牌刷新的时间间隔由长逐渐变短。等存储令牌数从 maxPermits 到达 thresholdPermits 时,发放令牌的时间价格也由 coldInterval 降低到了正常的 stableInterval

测试代码:

1
2
3
4
5
6
7
8
9
10
11
RateLimiter rateLimiter = RateLimiter.create(5, 1, TimeUnit.SECONDS);
IntStream.iterate(1, x -> x + 1)
        .limit(10).
        forEach(x -> System.out.println(System.currentTimeMillis() + " wait " + rateLimiter.acquire()));

Thread.sleep(5000L);

IntStream.iterate(1, x -> x + 1)
        .limit(10).
        forEach(x -> System.out.println(System.currentTimeMillis() + " wait " + rateLimiter.acquire()));

结果不再粘贴了,可以自己试一下,可以看出平滑预热限流的耗时是慢慢趋近平均值的。

下面是QPS=4,warmup为2秒时候对应的图:

SmoothWarmingUp

可以算出梯形部分的面积就是(0.25+0.75)*4/2=2,即warmup的2秒。

全局限流

Guava只是作了单机的限流方案,并不支持集群的限流。当然,我们可以根据总体QPS/机器数来做临时方案。不过该方案取决于前端负载均衡的平衡情况,而且当应用增减机器时,需要动态调整该参数,并不十分方便。

开源的可以考虑使用阿里的SentinelResilience4J

References

本文首次发布于 LiuShuo’s Blog, 转载请保留原文链接.