要解决的事情是,在一段时间内允许操作的数量。一般有3中方式实现,在讨论过程中,加入分布式这种限制条件。

计数器

使用redis的zset可以轻松做到。key/score采用时间戳的方式,可以快速知道过去一段时间内的流量,以及删除时间外的信息。

from app.extensions import redis
import time


class RateLimiter:
    def __init__(self, name, period, count_limit):
        self.redis_key = f"ratelimit.zset.{name}"
        self.period = period
        self.count_limit = count_limit

    def is_over_limit(self, quota=1):
        now = int(time.time())
        with redis.pipeline() as pipe:
            pipe.zadd(self.redis_key, {now: now for i in range(quota)})
            pipe.zremrangebyscore(self.redis_key, 0, now - self.period)
            pipe.zcard(self.redis_key)
            pipe.expire(self.redis_key, self.period + 1)
            _, _, current_count, _ = pipe.execute()
        return current_count >= self.count_limit


def test():
    rate_limiter = RateLimiter("test_limiter", 1, 10)
    for i in range(10):
        assert not rate_limiter.is_over_limit()
    assert rate_limiter.is_over_limit()


if __name__ == "__main__":
    test()


漏桶

流量到达之后,将其数量加入到一个桶中,如果桶满,则需要限速;桶不满则通过。还需记录一个桶中还有多少剩余,以及上次漏桶时间。在限速操作之前对其进行漏桶处理。

import time


class RateLimiterLeaking:
    def __init__(self, max_quota, issue_rate, init_token):
        self.max_quota = max_quota
        self.issue_rate = issue_rate
        self.last_issue_time = time.time()
        self.exist_token = init_token

    def is_over_limit(self, quota):
        now_time = time.time()
        self.exist_token += (now_time - self.last_issue_time) * self.issue_rate
        self.last_issue_time = now_time
        if self.exist_token > self.max_quota:
            self.exist_token = self.max_quota
        if self.exist_token - quota < 0:
            return True
        self.exist_token -= quota
        return False


令牌桶

每隔一段时间向一个桶中放入令牌。当流量到来时,从令牌桶中取出若干令牌。如果取不出,则限速,取出则不限速。桶的容量是固定的。

import time


class RateLimiterLeaking:
    def __init__(self, max_quota, issue_rate, init_token):
        self.max_quota = max_quota
        self.issue_rate = issue_rate
        self.last_issue_time = time.time()
        self.exist_token = init_token

    def is_over_limit(self, quota):
        now_time = time.time()
        self.exist_token += (now_time - self.last_issue_time) * self.issue_rate
        self.last_issue_time = now_time
        if self.exist_token > self.max_quota:
            self.exist_token = self.max_quota
        if self.exist_token - quota < 0:
            return True
        self.exist_token -= quota
        return False


总结

  1. 可以看出计数器方式占用空间比较大,后两种方式,动态变化的只有两个值,空间占用小。如果考虑使用redis实现的话,可以使用redis+lua脚本。

  2. 如果使用到redis+lua脚本,由于redis是有过期时间的,可以天生作为“漏桶”使用,可以不用记录时间的变量。将key的过期时间设置为1s,max_count作为1s最大限流数量即可。