当你编写一个程序时,你可能不会坐下来写一个长函数——至少我希望你不要! 你会以函数,结构体,方法等形式构造抽象。为什么要这样做? 部分是为了抽象出无关的细节,另一部分是为了能够在不影响其他区域的情况下处理某个代码区域。你没有必要改变整个系统,当发现自己必须调整多个区域才能做出一个合乎逻辑的改变时,说明该系统的抽象实在是很糟糕。

pipline,又名管道,或者叫流水线,是你可以用来在系统中形成抽象的另一种工具。特别是当你的程序需要处理流或批处理数据时,它是一个非常强大的工具。管道这个词被认为是在1856年首次使用的,指将液体从一个地方输送到另一个地方的一系列管道。计算机科学借用了这个术语,因为我们也在从一个地方向另一个地方传输某些东西:数据。管道是个系统,它将一系列数据输入,执行操作并将数据传回。我们称这些操作都是管道的一个阶段。

通过使用管道,你可以分离每个阶段的关注点,这提供了许多好处。你可以独立于彼此修改模块,你可以混合搭配模块的组合方式,而无需修改模块,你可以让每个模块同时处理到上游或下游模块,并且可以扇出或限制你的部分管道。 我们将在”扇出,扇入”一节中介绍扇出,我们将在第5章介绍速率限制。你现在不必担心这些奇怪的术语; 让我们从最简单的开始,尝试构建一个管道。

如前所述,一个阶段只是类似于执行将数据输入,对其进行转换并将数据发回这样的功能。 这是一个可以被视为管道某阶段的例子:

multiply := func(values []int, multiplier int) []int {
    multipliedValues := make([]int, len(values))
    for i, v := range values {
        multipliedValues[i] = v * multiplier
    }
    return multipliedValues
}

这个函数用取整数切片,循环遍历它们,然后返回一个新的切片。看起来很无聊的功能,对吧? 让我们创建管道的另一个阶段:

add := func(values []int, additive int) []int {
    addedValues := make([]int, len(values))
    for i, v := range values {
        addedValues[i] = v + additive
    }
    return addedValues
}

跟上个函数类似,只不过把乘法变成了加法。接下来,让我们尝试将它们合并:

ints := []int{1, 2, 3, 4}
for _, v := range add(multiply(ints, 2), 1) {
    fmt.Println(v)
}

这会输出:

3
5
7
9

看看我们是如何将他们结合起来的。这些函数就像你每天工作使用的函数一样,但是我们可以将它们组合起来形成一个流水线。 那么我们怎么定义管道的“阶段”呢?

  • 一个阶段消耗并返回相同的类型。
  • 一个阶段必须通过语言来体现,以便它可以被传递。 Go的函数已经实现并很好地适用于这个目的。

那些熟悉函数式编程的人可能会点头并思考像高阶函数和monad这样的术语。 事实上,管道的各阶段与函数式编程密切相关,可以被认为是monad的一个子集。 我不会在这里明确地讨论Monad或函数式编程,但它们本身就是有趣的主题,在尝试理解管道时,对这两个主题的工作知识虽然没有必要,但是对于加强理解是有用的。

在这里,我们的add和multiply满足管道的阶段属性:它们都消耗int切片并返回int切片,并且因为Go支持函数传递,所以我们可以传递add和multiply。 这些属性很有趣:在不改变阶段本身的情况下,我们可以很容易地将我们的阶段结合到更高层次。

例如,如果我们现在想为管道添加一个额外的阶段:乘以2,只需将我们以前的管道包装在一个新的阶段内,就像这样:

ints := []int{1, 2, 3, 4}
for _, v := range multiply(add(multiply(ints, 2), 1), 2) {
    fmt.Println(v)
}

这会输出:

6
10
14
18

注意我们是如何做到这一点的。也许你已经开始看到使用管道模式的好处了。 当然,我们也可以在程序上编写此代码:

ints := []int{1, 2, 3, 4}
for _, v := range ints {
    fmt.Println(2 * (v*2 + 1))
}

虽然这看起来简单得多,但正如我们接下来会看到的,程序在处理数据流时不会提供与管道相同的好处。

请注意每个阶段如何获取数据切片并返回数据的。这些阶段的行为我们称为批处理。这意味着它们一次性对大块数据进行操作,而不是一次一个单独进行。还有另一种类型的管道,模块每次仅接收和返回单个元素。

批处理和流处理各有优点和缺点,我们稍微讨论一下。现在,请注意,为使原始数据保持不变,每个阶段都必须创建一个等长的新切片来存储其计算结果。这意味着我们的程序在任何时候的内存占用量都是我们发送到管道开始处的切片大小的两倍。 让我们转换为面向流操作,看看会有什么效果:

multiply := func(value, multiplier int) int {
    return value * multiplier
}

add := func(value, additive int) int {
    return value + additive
}

ints := []int{1, 2, 3, 4}
for _, v := range ints {
    fmt.Println(multiply(add(multiply(v, 2), 1), 2))
}

这会输出:

6
10
14
18

每个阶段都接收并返回一个值,程序的内存占用空间将回落到管道输入数据的大小。但是我们不得不将管道放入到for循环的体内,这让我们的操作变“重”了。这不仅限制了我们对管道的重复使用,而且我们稍后会在本节中看到,这也限制了我们的扩展能力。 实际上,我们因为循环而在每次迭代中实例化我们的管道。尽管进行函数调用耗费的资源很少,但函数的调用次数确实增加了。那么如果涉及到并发性又如何?我之前说过,使用管道的好处之一是能够同时处理数据的各个阶段,并且我提到了一些关于扇出的内容。这些我们在接下来会进一步了解到。

我会扩展add和multiply来介绍这些概念。现在开始学习在Go中构建管道的最佳实践的时候了,先从并发原语通道开始。

最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng