那么你已经建立了一条管道。 数据在你的系统中欢畅地流动,并在莫连接在一起的各个阶段发生变化。 它就像一条美丽的溪流; 一个美丽的,缓慢的溪流,哦,我的上帝为什么这需要这么久?

有时候,管道中的各个阶段可能在计算上特别耗费资源。当发生这种情况时,管道中的上游阶段可能会在等待完成时被阻塞。不仅如此,管道本身可能需要很长时间才能整体执行。 我们如何解决这个问题?

管道的一个有趣属性是它的各个阶段相互独立,方便组合。你可以多次重复使用管道的各个阶段。因此,在多个goroutine上重用管道的单个阶段实现并行化,将有助于提高管道的性能。

事实上,这种模式被称为扇入扇出。

扇出(Fan-out)是一个术语,用于描述启动多个goroutines以处理来自管道的输入的过程,并且扇入(fan-in)是描述将多个结果组合到一个通道中的过程的术语。

那么在什么情况下适用于这种模式呢?如果出现以下两种情况,你就可以考虑这么干了:

  • 不依赖模块之前的计算结果。
  • 运行需要很长时间。

运行的独立性是非常重要的,因为你无法保证各阶段的并发程序以何种顺序运行,也无法保证其返回的顺序。

我们来看一个例子。在下面的例子中,构建了一个寻找素数的方法。我们将使用在“管道”中的经验,创建各个阶段,并将它们拼接在一起:

rand := func() interface{} { return rand.Intn(50000000) }

done := make(chan interface{})
defer close(done)

start := time.Now()

randIntStream := toInt(done, repeatFn(done, rand))
fmt.Println("Primes:")
for prime := range take(done, primeFinder(done, randIntStream), 10) {
    fmt.Printf("\t%d\n", prime)
}

fmt.Printf("Search took: %v", time.Since(start))

这会输出:

Primes:
24941317
36122539
6410693
10128161
25511527
2107939
14004383
7190363
45931967
2393161
Search took: 23.437511647s

我们生成一串随机数,最大值为50000000,将数据流转换为整数流,然后将其传入primeFinder。pri meFinder会尝试将输入流提供的数字除以比它小的每个数字。如果不成功,会将该值传递到下一个阶段。当然,这个方法很低效,但它符合我们程序运行时间较长的要求。

在我们的for循环中,搜索找到的素数,在进入时将它们打印出来,并且take在找到10个素数后关闭管道。然后,我们打印出搜索需要多长时间,完成的通道被延迟声明关闭,管道停止 。

为了避免结果中出现重复,我们可以把已找到的素数缓存起来,但为了简单起见,我们将忽略这些。

你可以看到大概需要23秒才能找到10个素数,这实在是有点慢。通常遇到这种情况,我们首先看一下算法本身,也许是拿一本算法书籍,然后看看我们是否能在哪个阶段改进。但是,由于目的是通过扇出来解决该问题,所以算法我们暂时先不去管它。

我们的程序现在有两个阶段:生成随机数和筛选素数。在更大的程序中,你的管道可能由更多的阶段组成,那我们该对什么样的阶段使用扇出模式进行改进?请记住我们之前提出的标准:执行顺序的独立性和执行时间。我们的随机数生成器肯定是与顺序无关的,但运行起来并不需要很长的时间。PrimeFinder阶段也是顺序无关的,因为我们采用的算法效率非常低下,它需要很长时间才能运行完成。因此,我们可以把关注点放在PrimeFinder身上。

为此,我们可以将其操作拆散,就像这样:

numFinders := runtime.NumCPU()
finders := make([]<-chan int, numFinders)
for i := 0; i < numFinders; i++ {
    finders[i] = primeFinder(done, randIntStream)
}

在我的电脑上,runtime.NumCPU()返回8,在生产中,我们可能会做一些经验性的测试来确定CPU的最佳数量,但在这里我们将保持简单,并且假设只有一个findPrimes阶段的CPU会被占用。

这就好像一个班级的作业,原本由1位老师批改,现在变成了8位老师同时批改。

接下来我们遇到的问题是,如何将结果汇总到一起。为此,我们开始考虑使用扇入(fan-in)。

正如我们前面所提到的,扇入意味着将多个数据流复用或合并成一个流。 这样做相对简单:

fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} { // 1

    var wg sync.WaitGroup // 2
    multiplexedStream := make(chan interface{})

    multiplex := func(c <-chan interface{}) { // 3
        defer wg.Done()
        for i := range c {
            select {
            case <-done:
                return
            case multiplexedStream <- i:
            }
        }
    }

    // 从所有的通道中取数据
    wg.Add(len(channels)) // 4
    for _, c := range channels {
        go multiplex(c)
    }

    // 等待所有数据汇总完毕
    go func() { // 5
        wg.Wait()
        close(multiplexedStream)
    }()

    return multiplexedStream
}
  1. 一如既往,我们使用done通道来关闭衍生的goroutine,并接收接口类型的通道切片来汇总数据。
  2. 这里我们使用sync.WaitGroup以等待全部通道读取完成。
  3. 我们在这里建立函数multiplex,它会读取传入的通道,并把该通道的值放入multiplexedStream。
  4. 这里增加等待计数。
  5. 这里我们建立一个goroutine等待汇总完毕。这样函数块可以快速return,不必等待wg.Wait()。这种用法不多见,但在这里很符合场景需求。

简而言之,扇入涉及读取多路复用通道,然后为每个传入通道启动一个goroutine,以及在传入通道全部关闭时关闭复用通道。由于我们要创建一个等待N个其他goroutine完成的goroutine,因此创建sync.WaitGroup来协调处理是有意义的。multiplex还通知WaitGroup它已执行完成。

额外提醒,在对返回结果的顺序有要求的情况下扇入扇出可能工作的不是很好。我们没有做任何事情来保证从randIntStream中读取数据的顺序。稍后,我们将看一个维护顺序的例子。

让我们把所有这些改进放在一起,看看运行时长是否有所减少:

done := make(chan interface{})
defer close(done)

start := time.Now()

rand := func() interface{} { return rand.Intn(50000000) }

randIntStream := toInt(done, repeatFn(done, rand))

numFinders := runtime.NumCPU()
fmt.Printf("Spinning up %d prime finders.\n", numFinders)
finders := make([]<-chan interface{}, numFinders)
fmt.Println("Primes:")
for i := 0; i < numFinders; i++ {
    finders[i] = primeFinder(done, randIntStream)
}

for prime := range take(done, fanIn(done, finders...), 10) {
    fmt.Printf("\t%d\n", prime)
}

fmt.Printf("Search took: %v", time.Since(start))

这会输出:

Spinning up 8 prime finders. Primes:
6410693
24941317
10128161
36122539
25511527
2107939
14004383
7190363
2393161
45931967
Search took: 5.438491216s

最大降幅23秒,这简直是个壮举。运用扇入扇出可以在不大幅改变程序结构的前提下将运行时间缩短了大约78%。

最后编辑: kuteng  文档更新时间: 2021-01-02 17:30   作者:kuteng