通道非常适合在Go中构建管道,因为它们满足了我们所有的基本要求。它们可以接收并传递值,它们可以在并发中安全的使用,它们可以被遍历,而且它们被语言给予了完美的支持。让我们用通道将之前的例子改造一下:

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case multipliedStream <- i * multiplier:
            }
        }
    }()

    return multipliedStream
}

add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case addedStream <- i + additive:
            }
        }
    }()
    return addedStream
}

done := make(chan interface{})
defer close(done)

intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
    fmt.Println(v)
}

这会输出:

6
10
14
18

看起来我们得到了期望的输出结果,但代价是代码更多了。先,我们来看看我们写的是什么。 现在有三个函数,而不是两个。他们都看起来像是在内部开启一个通道,并使用我们在“防止Goroutine泄漏”中建立的模式,通过一个done通道表示该通道应该退出。他们都看起来像返回通道,其中一些看起来像他们也采用了额外的通道。让我们开始进一步分解:

done := make(chan interface{})
defer close(done)

我们的程序首先创建了done通道,并调用close通过defer延迟执行。正如前面所讨论的那样,这可以确保我们的程序干净地退出,而不泄漏goroutines。没有什么新鲜的。接下来,我们来看看函数generator:

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

// ...

intStream := generator(done, 1, 2, 3, 4)

generator接收整数类型的切片,构造整数类型的通道,启动一个goroutine并返回构造的通道。然后,在创建的goroutine通道上发送切片的值。

请注意,通道上的发送与done通道上的选择共享一条select语句。这是我们在“防止Goroutine泄漏”中建立的模式。

简而言之,generator函数将一组离散值转换为一个通道上的数据流。这种操作的函数我们称之为生成器。在使用管道时,你会经常看到这一点,因为在管道开始时,你总是会有一些需要转换为通道的数据。我们将稍微介绍一些有趣的生成器的几个例子,但我们先来完成对这个程序的分析。 接下来,我们构建管道:

pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

这与本节之前的流程相同:对于一串数字,我们将它们乘以2,加1,然后将结果乘以2。这条管道与我们前面例子中使用函数的管道相似,但它在很重要的方面有所不同。

首先,我们正在使用通道。 这是显而易见的,因为它允许两件事:在我们的管道的末尾,可以使用range语句来提取值,并且在每个阶段我们可以安全地并发执行,因为我们的输入和输出在并发上下文中是安全的。

这给我们带来了第二个区别:管道的每个阶段都在同时执行。 这意味着任何阶段只需要等待其输入,并且能够发送其输出。 事实证明,这会产生巨大的影响,我们将在“扇出,扇入”一节中发现,但现在我们可以简单地注意到它允许各阶段相互独立地执行一段时间。

最后,在我们的例子中,我们对这个管道进行了遍历取值:

for v := range pipeline {
    fmt.Println(v)
}

下面是一个表格,演示系统中的每个值如何进入每个通道,以及通道何时关闭。

让我们更仔细地研究一下这个模式来标示goroutines退出。当处理多个相互依赖的goroutines时,这种模式如何起作用? 如果我们在程序完成执行之前在完成的通道上调用close,会发生什么情况?

要回答这些问题,再来看看管道是构建的这一行:

pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

管道的各阶段通过两种方式连接在一起:通过默认的done通道,和被传递给后续阶段的通道。换句话说,multiply函数创建的通道被传递给add函数。让我们重新审视前面的表格,并在完成之前,关闭done通道,看看会发生什么:

看到关闭done通道是如何影响到管道的了么?这是通过管道每个阶段的两件事情实现的:

  • 对传入的频道进行遍历。当输入通道关闭时,遍历操作将退出。
  • 发送操作与done通道共享select语句。

无论流水线阶段处于等待数据通道的状态,还是处在等待发送通道关闭的状态,都会强制管道各阶段终止。
这里有一个复发关系。在管道开始时,我们已经确定必须将传入的切片值转换为通道。在这个过程中有两点必须是可抢占的:

  • 在生成器通道上创建值。
  • 在其频道上发送离散值。

在我们的例子中,在生成器函数中,离散值是通过遍历切片生成的,它足够快,不需要被抢占。 第二个是通过我们的select语句和done通道处理的,它确保发生器即使被阻塞试图写入intStream也是可抢占的。

在管道的另一端,同样我们可以确保最终阶段的可抢占性。因为我们正在操作的通道在抢占时会被关闭,所以当这种情况发生时,通道将会中断。 最后阶段是可抢占的,因为我们依赖的流是可抢占的。

在管道开始和结束之间,代码总是在一个通道上遍历,并在包含done通道的select语句内的另一个通道上发送。

如果某个阶段在传入通道检索到值时被阻塞,则该通道关闭时它将变为未阻塞状态。 如果某个阶段在发送值时被阻塞,则由于select语句而可抢占。

因此,我们的整个管道始终可以通过关闭done通道来抢占。

最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng