如果你曾经使用过API来获取服务,那么你可能经受过与速率限制相抗衡。速率限制使得某种资源每次访问的次数受限。资源可以是任何东西:API连接,磁盘I/O,网络包,错误。

你有没有想过为什么会需要制定速率限制?为什么不允许无限制地访问系统?最明显的答案是,通过对系统进行速率限制,可以防止整个系统遭受攻击。如果恶意用户可以在他们的资源允许的情况下极快地访问你的系统,那么他们可以做各种事情。

例如,他们可以用日志消息或有效的请求填满服务器的磁盘。如果你的日志配置错误,他们甚至可能会执行恶意的操作,然后提交足够的请求,将任何活动记录从日志中移出并放入/dev/null中导致日志系统完全崩溃。他们可能试图暴力访问资源,或者执行分布式拒绝服务攻击。如果你没有对系统进行请求限制,你的系统就成了一个走在街上不穿衣服的大美女。

更糟糕的是,非恶意请求有时也会造成上述结果。恶意使用并不是唯一的原因。 在分布式系统中,如果合法用户正在以足够高的速度执行操作,或者他们正在运行的代码有问题,则合法用户可能会降低系统的性能。这甚至可能导致我们之前讨论的死亡螺旋。 从产品的角度来看,这太糟糕了!通常情况下,你希望向用户提供某种类型的性能保证,以确保他们可以在一致的基础上达到不错的性能。如果一个用户可以影响该协议,那么你的日子就不好过了。用户对系统的访问通常被沙盒化,既不会影响其他用户的活动也不会受其他用户影响。如果你打破了这种思维模式,你的系统会表现出糟糕的设计使用感,甚至会导致用户生气或离开。

我曾经在一个分布式系统上工作,通过启动新的进程来并行扩展(这允许它水平扩展到多台机器)。每个进程都会打开数据库连接,读取一些数据并进行一些计算。一段时间以来,我们在以这种方式扩展系统以满足客户需求方面取得了巨大成功。 但是,一段时间后,我们发现大量的数据库中读取数据超时。
我们的数据库管理员仔细查看了日志,试图找出问题所在。最后他们发现,由于系统中没有设定任何速率限制,所以进程彼此重叠。由于不同的进程试图从磁盘的不同部分读取数据,因此磁盘竞争将达到100%并一直保持在这个水平。这反过来导致了一系列的循环——超时——重试——循环。工作永远不会完成。
我们设计了一个系统来限制数据库上可能的连接数量,并且速率限制被放在连接可以读取的秒级单位,问题消失了。虽然客户不得不等待更长的时间,但毕竟他们的工作完成了,我们能够在接下来的时间里进行适当的容量规划,以系统化的方式扩展容量。

速率限制使得你可以通过某个界限来推断系统的性能和稳定性。如果你需要扩展这些边界,可以在大量测试后以可控的方式进行。

在密集用户操作的情况下,速率限制可以使你的系统与用户之间保持良好的关系。你可以允许他们在严格限制的速率下尝试系统的特性。Google的云产品证明了这一点,这种限制在某种意义上是可以保护用户的。

速率限制通常由资源的构建者角度来考虑,但对于用户来说,能够减少速率限制的影响会让其感到非常欣慰。

那么我们怎样用Go来实现呢?

大多数速率限制是通过使用称为令牌桶的算法完成的。这很容易理解,而且相对容易实施。 我们来看看它背后的理论。

假设要使用资源,你必须拥有资源的访问令牌。没有令牌,请求会被拒绝。想象这些令牌存储在等待被检索以供使用的桶中。该桶的深度为d,表示它一次可以容纳d个访问令牌。

现在,每次你需要访问资源时,你都会进入存储桶并删除令牌。如果你的存储桶包含五个令牌,那么您可以访问五次资源。在第六次访问时,没有访问令牌可用,那么必须将请求加入队列,直到令牌变为可用,或拒绝请求。

这里有一个时间表来帮助观察这个概念。time表示以秒为单位的时间增量,bucket表示桶中请求令牌的数量,并且请求列中的tok表示成功的请求。(在这个和未来的时间表中,我们假设这些请求是即时的。)

可以看到,在第一秒之前,我们能够完成五个请求,然后我们被阻塞,因为没有更多的令牌可供使用。

到目前为止,这些都很容易理解。那么如何补充令牌呢?在令牌桶算法中,我们将r定义为将令牌添加回桶的速率。 它可以是一纳秒或一分钟。它就是我们通常认为的速率限制:因为我们必须等待新的令牌可用,我们将操作限制为令牌的刷新率。

以下是深度为1的令牌桶示例,速率为1令牌/秒:

可以看到我们立即可以提出请求,但是只能隔一秒钟将请求一次。速率限制执行的非常好!

所以我们现在有两个设置可以摆弄:有多少令牌可用于立即使用,即桶的深度d,以及它们补充的速率r。另外我们还可以考虑下当存储桶已满时可以进行多少次请求。

以下是深度为5的令牌桶示例,速率为0.5令牌/秒:

在这里,我们能够立即提出五个请求,在这之后,每两秒限制一次请求。请求的爆发是在桶最初装满的时候。

请注意,用户可能不会消耗一个长数据流中的整个令牌桶。桶的深度只控制桶的容量。这里有一个用户爆发两次请求的例子,然后四秒钟后爆发了五次:


虽然用户有可用的令牌,但这种爆发性仅允许根据调用者的能力限制访问系统。对于只能间歇性访问系统但希望尽快往返的用户来说,这种爆发性的存在是很好的。你只需要确保系统能够同时处理所有用户的爆发操作,或者确保操作上不可能有足够的用户同时爆发操作以影响你的系统。无论哪种方式,速率限制都可以让你规避风险。

让我们使用这个算法来看看一个Go程序在执行令牌桶算法的时如何表现。

假设我们可以访问API,并且已经提供了客户端来使用它。该API有两个操作:一个用于读取文件,另一个用于将域名解析为IP地址。为了简单起见,我将忽略任何参数并返回实际访问服务所需的值。 所以这是我们的客户端:

func Open() *APIConnection {
    return &APIConnection{}
}

type APIConnection struct{}

func (a *APIConnection) ReadFile(ctx context.Context) error {
    // Pretend we do work here
    return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
    // Pretend we do work here
    return nil
}

由于理论上这个请求正在通过网络传递,所以我们将context.Context作为第一个参数,以防我们需要取消请求或将值传递给服务器。 通过前面的章节讨论,想必大家已经熟悉了这样的用法。

现在我们将创建一个简单的程序来访问这个API。程序需要读取10个文件并解析10个地址,但这些文件和地址彼此没有关系,因此程序可以并发调用。稍后这将有助于添加利率限制。

func main() {

    defer log.Printf("Done.")

    log.SetOutput(os.Stdout)
    log.SetFlags(log.Ltime | log.LUTC)

    apiConnection := Open()
    var wg sync.WaitGroup
    wg.Add(20)

    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()
            err := apiConnection.ReadFile(context.Background())
            if err != nil {
                log.Printf("cannot ReadFile: %v", err)
            }
        }()

    }
    log.Printf("ReadFile")

    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()
            err := apiConnection.ResolveAddress(context.Background())
            if err != nil {
                log.Printf("cannot ResolveAddress: %v", err)

            }
        }()
    }
    log.Printf("ResolveAddress")

    wg.Wait()
}

这会输出:

20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 Done.

我们可以看到所有的API请求几乎同时进行。由于没有设定速率限制,所以用户可以随意随意访问系统。现在我需要提醒你,当前个可能导致无限循环的错误。

好的,让我们来介绍一个限速器。我打算在APIConnection中这样做,但通常会在服务器上运行速率限制器,这样用户就无法绕过它。生产系统还可能包括一个客户端速率限制器,以防止用户因不必要的调用而被拒绝,但这是一种优化,并不是必需的。就我们的目的而言,限速器使事情变得简单。

我们将看看 golang.org/x/time/rate 中的令牌桶速率限制器实现的示例。我选择了这个包,因为这跟我所能得到的标准库较为相近。当然还有其他的软件包可以做更多的花里胡哨的工作。 golang.org/x/time/rate 非常简单,而且它适用于我们的目的。

我们将与这个包交互的前两种方法是Limit类型和NewLimiter函数,在这里定义:

// Limit 定义了事件的最大频率。
// Limit 被表示为每秒事件的数量。
// 值为0的Limit不允许任何事件。
type Limit float64

// ewLimiter返回一个新的Limiter实例,
// 件发生率为r,并允许至多b个令牌爆发。
func NewLimiter(r Limit, b int) *Limiter

在NewLimiter中,我们看到了两个熟悉的参数:r ,以及 b.r。b是我们之前讨论过的桶的深度。

rate 包还定义了一个有用的函数,Every,帮助将time.Duration转换为Limit:

// Every将事件之间的最小时间间隔转换为Limit。
func Every(interval time.Duration) Limit

Every函数是有意义的,但我想讨论每次rate限制了操作次数,而不是请求之间的时间间隔。 我们可以将其表述如下:

rate.Limit(events/timePeriod.Seconds())

但是我不想每次都输入这个值,Every函数有一些特殊的逻辑会返回rate.Inf——表示没有限制——如果提供的时间间隔为零。 正因为如此,我们将用这个词来表达我们的帮助函数:

func Per(eventCount int, duration time.Duration) rate.Limit {
    return rate.Every(duration/time.Duration(eventCount))
}

在我们建立rate.Limiter后,我们希望使用它来阻塞我们的请求,直到获得访问令牌。 我们可以用Wait方法来做到这一点,它只是用参数1来调用WaitN:

// Wait 是 WaitN(ctx, 1)的简写。
func (lim *Limiter) Wait(ctx context.Context)

// WaitN 会发生阻塞直到 lim 允许的 n 个事件执行。
// 它返回一个 error 如果 n 超过了 Limiter的桶大小,
// Context会被取消, 或等待的时间超过了 Context 的 Deadline。
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

我们已经集齐了限制API请求的所有要素。让我们修改APIConnection类型并尝试一下:

func Open() *APIConnection {
    return &APIConnection{
        rateLimiter: rate.NewLimiter(rate.Limit(1), 1) //1
    }
}

type APIConnection struct {
    rateLimiter *rate.Limiter
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
    if err := a.rateLimiter.Wait(ctx); err != nil { //2
        return err
    }
    // Pretend we do work here
    return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
    if err := a.rateLimiter.Wait(ctx); err != nil { //2
        return err
    }
    // Pretend we do work here
    return nil
}
  1. 我们将所有API连接的速率限制设置为每秒一个事件。
  2. 我们等待速率限制器有足够的权限来完成我们的请求。

这会输出:

22:08:30 ResolveAddress
22:08:31 ReadFile
22:08:32 ReadFile
22:08:33 ReadFile
22:08:34 ResolveAddress
22:08:35 ResolveAddress
22:08:36 ResolveAddress
22:08:37 ResolveAddress
22:08:38 ResolveAddress
22:08:39 ReadFile
22:08:40 ResolveAddress
22:08:41 ResolveAddress
22:08:42 ResolveAddress
22:08:43 ResolveAddress
22:08:44 ReadFile
22:08:45 ReadFile
22:08:46 ReadFile
22:08:47 ReadFile
22:08:48 ReadFile
22:08:49 ReadFile
22:08:49 Done.

在生产中,我们可能会想要一些更复杂的东西。我们可能希望建立多层限制:细粒度控制以限制每秒请求数,粗粒度控制限制每分钟,每小时或每天的请求数。

在某些情况下,可以通过速率限制器来实现; 然而,在所有情况下都这么干是不可能的,并且通过尝试将单位时间的限制语义转换为单一层,你会因速率限制器而丢失大量信息。出于这些原因,我发现将限制器分开并将它们组合成一个限速器来管理交互更容易。 为此,我创建了一个简单的聚合速率限制器multiLimiter。 这是定义:

type RateLimiter interface { //1
    Wait(context.Context) error
    Limit() rate.Limit
}

func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
    byLimit := func(i, j int) bool {
        return limiters[i].Limit() < limiters[j].Limit()
    }
    sort.Slice(limiters, byLimit) //2
    return &multiLimiter{limiters: limiters}

}

type multiLimiter struct {
    limiters []RateLimiter
}

func (l *multiLimiter) Wait(ctx context.Context) error {
    for _, l := range l.limiters {
        if err := l.Wait(ctx); err != nil {
            return err
        }
    }
    return nil
}

func (l *multiLimiter) Limit() rate.Limit {
    return l.limiters[0].Limit() //3
}
  1. 这里我们定义一个RateLimiter接口,以便MultiLimiter可以递归地定义其他MultiLimiter实例。
  2. 这里我们实现一个优化,并按照每个RateLimiter的Limit()行排序。
  3. 因为我们在multiLimiter实例化时对子RateLimiter实例进行排序,所以我们可以简单地返回限制性最高的limit,这将是切片中的第一个元素。

Wait方法遍历所有子限制器,并在每一个子限制器上调用Wait。这些调用可能会也可能不会阻塞,但我们需要通知每个子限制器,以便减少令牌桶内的令牌数量。通过等待每个限制器,我们保证最长的等待时间。这是因为如果我们执行时间较小的等待,那么最长的等待时间将被重新计算为剩余时间。在较早的等待被阻塞时,后者会等待令牌桶的填充。

经过思考,让我们重新定义APIConnection,对每秒和每分钟都进行限制:

func Open() *APIConnection {
    secondLimit := rate.NewLimiter(Per(2, time.Second), 1)   //1
    minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10) //2
    return &APIConnection{
        rateLimiter: MultiLimiter(secondLimit, minuteLimit) //3
    }
}

type APIConnection struct {
    rateLimiter RateLimiter
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
    if err := a.rateLimiter.Wait(ctx); err != nil {
        return err
    }
    // Pretend we do work here
    return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
    if err := a.rateLimiter.Wait(ctx); err != nil {
        return err
    }
    // Pretend we do work here
    return nil
}
  1. 我们定义了每秒的极限。
  2. 我们定义每分钟的突发极限为10,为用户提供初始池。每秒的限制将确保我们1不会因请求而使系统过载。
  3. 我们结合这两个限制,并将其设置为APIConnection的主限制器。

这会输出:

22:46:10 ResolveAddress
22:46:10 ReadFile
22:46:11 ReadFile
22:46:11 ReadFile
22:46:12 ReadFile
22:46:12 ReadFile
22:46:13 ReadFile
22:46:13 ReadFile
22:46:14 ReadFile
22:46:14 ReadFile
22:46:16 ResolveAddress
22:46:22 ResolveAddress
22:46:28 ReadFile
22:46:34 ResolveAddress
22:46:40 ResolveAddress
22:46:46 ResolveAddress
22:46:52 ResolveAddress
22:46:58 ResolveAddress
22:47:04 ResolveAddress
22:47:10 ResolveAddress
22:47:10 Done.

正如您所看到的,我们每秒发出两个请求,直到请求#11,此时我们开始每隔六秒发出一次请求。 这是因为我们耗尽了我们可用的每分钟请求令牌池,并受此限制。

为什么请求#11在两秒内发生而不是像其他请求那样发生,这可能有点违反直觉。 请记住,尽管我们将API请求限制为每分钟10个,但一分钟是一个动态的时间窗口。 当我们达到第十一个要求时,我们的每分钟限制器已经累积了另一个令牌。

通过定义这样的限制,我们可以清楚地表达了粗粒度限制,同时仍然以更详细的细节水平限制请求数量。

这种技术也使我们能够开始思考除时间之外的其他维度。 当你对系统进行限制时,你可能会限制不止一件事。 也可能对API请求的数量有一些限制,也会对其他资源(如磁盘访问,网络访问等)有限制。让我们稍微充实一下示例并设置磁盘和网络限制

func Open() *APIConnection {
    return &APIConnection{
        apiLimit: MultiLimiter( //1
            rate.NewLimiter(Per(2, time.Second), 2),
            rate.NewLimiter(Per(10, time.Minute), 10)),
        diskLimit:    MultiLimiter(rate.NewLimiter(rate.Limit(1), 1)),      //2
        networkLimit: MultiLimiter(rate.NewLimiter(Per(3, time.Second), 3)) //3
    }
}

type APIConnection struct {
    networkLimit,
    diskLimit,
    apiLimit RateLimiter
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
    err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx) //4
    if err != nil {
        return err
    }
    // Pretend we do work here
    return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
    err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx) //5
    if err != nil {
        return err
    }
    // Pretend we do work here
    return nil
}
  1. 我们为API调用设置了一个限制器。 每秒请求数和每分钟请求数都有限制
  2. 我们为磁盘读取设置一个限制器。将其限制为每秒一次读取。
  3. 对于网络,我们将设置每秒三个请求的限制。
  4. 当我们读取文件时,将结合来自API限制器和磁盘限制器的限制。
  5. 当我们需要网络访问时,将结合来自API限制器和网络限制器的限制。

这会输出:

01:40:15 ResolveAddress
01:40:15 ReadFile
01:40:16 ReadFile
01:40:17 ResolveAddress
01:40:17 ResolveAddress
01:40:17 ReadFile
01:40:18 ResolveAddress
01:40:18 ResolveAddress
01:40:19 ResolveAddress
01:40:19 ResolveAddress
01:40:21 ResolveAddress
01:40:27 ResolveAddress
01:40:33 ResolveAddress
01:40:39 ReadFile
01:40:45 ReadFile
01:40:51 ReadFile
01:40:57 ReadFile
01:41:03 ReadFile
01:41:09 ReadFile
01:41:15 ReadFile
01:41:15 Done.

让我们关注一下这样的事实,即我们可以将限制器组合成对每个调用都有意义的组,并且让APIClient执行正确的操作。如果我们想随便观察一下它是如何工作的,会注意到涉及网络访问的API调用似乎以更加规律的方式发生,并在前三分之二的调用中完成。这可能与goroutine调度有关,也是我们的限速器正在执行各自的工作。

我还应该提到的是,rate.Limiter类型有一些其他的技巧来优化不同的用例。这里只讨论了它等待令牌桶接收另一个令牌的能力,但如果你有兴趣使用它,可以查阅文档。

在本节中,我们考察了使用速率限制的理由,构建了令牌桶算法的Go实现,以及如何将令牌桶限制器组合为更大,更复杂的速率限制器。这应该能让你很好地了解速率限制,并帮助你开始使用它们。

最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng